dev@grizzly.java.net

AsyncWriter proposal

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Wed, 31 Oct 2007 20:39:27 +0100

Hello,

here I would like to describe the initial propose I have for AsyncWriter
implementation.
Think it makes sense to associate AsyncWriter with SelectorHandler, so
we could have good control on events happening with Channel.

There are 3 classes attached.
AsyncWriter: This is the main class user will use to write data in
async. mode. Class has several overloaded write methods (see javadoc for
more details).
AsyncWriteCallbackHandler: interface, which could be implemented by
custom code to be notified, when Buffer will be async. written. Instance
of AsyncWriteCallbackHandler could be passed as one of AsyncWriter.write
parameters.
AsyncWriteQueue: represents Chanel <-> WriteQueue correspondence collection.

Classes are described more accurately in javadocs.

As I mentioned, think AsyncWriter should be integrated to the
SelectorHandler. So the way developer will be able to use its
functionality is:
selectorHandler.getAsyncWriter().write(byteBuffer...).

Will appreciate any feedback! :)

Thanks.

WBR,
Alexey.


/*
 * The contents of this file are subject to the terms
 * of the Common Development and Distribution License
 * (the License). You may not use this file except in
 * compliance with the License.
 *
 * You can obtain a copy of the license at
 * https://glassfish.dev.java.net/public/CDDLv1.0.html or
 * glassfish/bootstrap/legal/CDDLv1.0.txt.
 * See the License for the specific language governing
 * permissions and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL
 * Header Notice in each file and include the License file
 * at glassfish/bootstrap/legal/CDDLv1.0.txt.
 * If applicable, add the following below the CDDL Header,
 * with the fields enclosed by brackets [] replaced by
 * you own identifying information:
 * "Portions Copyrighted [year] [name of copyright owner]"
 *
 * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
 */

package com.sun.grizzly;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritableByteChannel;

/**
 *
 * @author Alexey Stashok
 */
public class AsyncWriter {
    private SelectorHandler selectorHandler;
    private AsyncWriteQueue writeQueue;
    
    public AsyncWriter(SelectorHandler selectorHandler) {
        this.selectorHandler = selectorHandler;
        writeQueue = new AsyncWriteQueue();
    }
    
    /**
     * Writes <code>ByteBuffer</code> to the <code>SelectableChannel</code>
     * First it tries to write <code>ByteBuffer</code> to the given <code>SelectableChannel</code>.
     * If, after write is called, <code>ByteBuffer</code> still has ready data to be written -
     * <code>ByteBuffer</code> will be added to <code>AsyncWriteQueue</code>
     * and <code>SelectableChannel<code> will be registered on
     * <code>SelectorHandler</code>, waiting for OP_WRITE event.
     *
     * @param channel <code>SelectableChannel</code> <code>ByteBuffer</code> should be written to
     * @param buffer <code>ByteBuffer</code>
     * @return true, if <code>ByteBuffer</code> was written completely, false if write operation was put to queue
     * @throws java.io.IOException
     */
    public boolean write(SelectableChannel channel, ByteBuffer buffer) throws IOException {
        return write(channel, buffer, null);
    }
    
    /**
     * Writes <code>ByteBuffer</code> to the <code>SelectableChannel</code>
     * First it tries to write <code>ByteBuffer</code> to the given
     * <code>SelectableChannel</code>.
     * If, after write is called, <code>ByteBuffer</code> still has ready
     * data to be written - <code>ByteBuffer</code> will be added to
     * <code>AsyncWriteQueue</code> and <code>SelectableChannel<code> will
     * be registered on <code>SelectorHandler</code>, waiting for OP_WRITE event.
     *
     * @param channel <code>SelectableChannel</code> <code>ByteBuffer</code>
     * should be written to
     * @param buffer <code>ByteBuffer</code>
     * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
     * which will get notified, when
     * <code>ByteBuffer</code> will be completely written
     * @return true, if <code>ByteBuffer</code> was written completely,
     * false if write operation was put to queue
     * @throws java.io.IOException
     */
    public boolean write(SelectableChannel channel, ByteBuffer buffer,
            AsyncWriteCallbackHandler callbackHandler) throws IOException {
        return write(channel, buffer, callbackHandler, false);
    }

    /**
     * Writes <code>ByteBuffer</code> to the <code>SelectableChannel</code>
     * First it tries to write <code>ByteBuffer</code> to the given
     * <code>SelectableChannel</code>.
     * If, after write is called, <code>ByteBuffer</code> still has ready
     * data to be written - <code>ByteBuffer</code> will be added to
     * <code>AsyncWriteQueue</code> and <code>SelectableChannel<code> will
     * be registered on <code>SelectorHandler</code>, waiting for OP_WRITE event.
     *
     * @param channel <code>SelectableChannel</code> <code>ByteBuffer</code>
     * should be written to
     * @param buffer <code>ByteBuffer</code>
     * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
     * which will get notified, when
     * <code>ByteBuffer</code> will be completely written
     * @param isCloneByteBuffer if true - AsyncWriter will clone given
     * <code>ByteBuffer</code> before puting it to the
     * <code>AsyncWriteQueue</code>
     * @return true, if <code>ByteBuffer</code> was written completely,
     * false if write operation was put to queue
     * @throws java.io.IOException
     */
    public boolean write(SelectableChannel channel, ByteBuffer buffer,
            AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) throws IOException {
        
        // If AsyncWriteQueue is empty - try to write ByteBuffer here
        if (!hasReadyAsyncWriteData(channel)) {
            ((WritableByteChannel) channel).write(buffer);
        }
        
        if (buffer.hasRemaining()) {
            
            // clone ByteBuffer if required
            if (isCloneByteBuffer) {
                int size = buffer.remaining();
                ByteBuffer newBuffer = buffer.isDirect() ?
                    ByteBuffer.allocateDirect(size) :
                    ByteBuffer.allocate(size);
                
                newBuffer.put(buffer);
                newBuffer.position(0);
                buffer = newBuffer;
            }
            
            // add new element to the queue
            writeQueue.offer(channel, new AsyncWriteQueue.AsyncQueueRecord(buffer, callbackHandler));
            selectorHandler.register(channel, SelectionKey.OP_WRITE);
            
            return false;
        }
        
        return true;
    }

    /**
     * Checks whether there is any data in <code>AsyncWriteQueue</code> ready
     * for the given <code>SelectableChannel</code>
     *
     * @param channel <code>SelectableChannel</code>
     * @return true, if there is ready data. False otherwise.
     */
    public boolean hasReadyAsyncWriteData(SelectableChannel channel) {
        return writeQueue.size(channel) > 0;
    }
    
    /**
     * Callback method, which should be called by <code>SelectorHandler</code> to
     * notify, that given <code>SelectableChannel</code> is ready to transmit data.
     *
     * @param channel <code>SelectableChannel</code>
     * @throws java.io.IOException
     */
    public void onWrite(SelectableChannel channel) throws IOException {
        AsyncWriteQueue.AsyncQueueRecord queueRecord = writeQueue.peek(channel);
        ByteBuffer byteBuffer = queueRecord.byteBuffer;
        ((WritableByteChannel) channel).write(byteBuffer);
        if (byteBuffer.hasRemaining()) {
            selectorHandler.register(channel, SelectionKey.OP_WRITE);
        } else {
            AsyncWriteQueue.AsyncQueueRecord removedQueueRecord = writeQueue.poll(channel);
            assert removedQueueRecord == queueRecord;
            if (removedQueueRecord.callbackHandler != null) {
                removedQueueRecord.callbackHandler.notifyWriteCompleted(channel, byteBuffer);
            }
        }
    }
    
    /**
     * Callback method, which should be called by <code>SelectorHandler</code> to
     * notify, that given <code>SelectableChannel</code> is going to be closed, so
     * related <code>SelectableChannel</code> data could be released from
     * <code>AsyncWriteQueue</code>
     *
     * @param channel <code>SelectableChannel</code>
     * @throws java.io.IOException
     */
    public void onClose(SelectableChannel channel) {
        writeQueue.removeEntry(channel);
    }
}


/*
 * The contents of this file are subject to the terms
 * of the Common Development and Distribution License
 * (the License). You may not use this file except in
 * compliance with the License.
 *
 * You can obtain a copy of the license at
 * https://glassfish.dev.java.net/public/CDDLv1.0.html or
 * glassfish/bootstrap/legal/CDDLv1.0.txt.
 * See the License for the specific language governing
 * permissions and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL
 * Header Notice in each file and include the License file
 * at glassfish/bootstrap/legal/CDDLv1.0.txt.
 * If applicable, add the following below the CDDL Header,
 * with the fields enclosed by brackets [] replaced by
 * you own identifying information:
 * "Portions Copyrighted [year] [name of copyright owner]"
 *
 * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
 */

package com.sun.grizzly;

import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;

/**
 * Callback handler interface, used by AsyncWriter to notify custom code about
 * completion of specific <code>ByteBuffer</code> writing.
 *
 * @author Alexey Stashok
 */
public interface AsyncWriteCallbackHandler {
    public void notifyWriteCompleted(SelectableChannel channel, ByteBuffer buffer);
}


/*
 * The contents of this file are subject to the terms
 * of the Common Development and Distribution License
 * (the License). You may not use this file except in
 * compliance with the License.
 *
 * You can obtain a copy of the license at
 * https://glassfish.dev.java.net/public/CDDLv1.0.html or
 * glassfish/bootstrap/legal/CDDLv1.0.txt.
 * See the License for the specific language governing
 * permissions and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL
 * Header Notice in each file and include the License file
 * at glassfish/bootstrap/legal/CDDLv1.0.txt.
 * If applicable, add the following below the CDDL Header,
 * with the fields enclosed by brackets [] replaced by
 * you own identifying information:
 * "Portions Copyrighted [year] [name of copyright owner]"
 *
 * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
 */

package com.sun.grizzly;

import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Class represents collection of <code>SelectableChannel</code>s and
 * correspondent queue of <code>ByteBuffer</code>, which
 * should be written asynchronously.
 *
 * @author Alexey Stashok
 */
public class AsyncWriteQueue {
    private Map<SelectableChannel, ConcurrentLinkedQueue<AsyncQueueRecord>> queueMap =
            new ConcurrentHashMap<SelectableChannel, ConcurrentLinkedQueue<AsyncQueueRecord>>();
    

    /**
     * Add data to the <code>AsyncWriteQueue</code>, corresponding to the given
     * <code>SelectableChannel</code>
     *
     * @param channel <code>SelectableChannel</code>
     * @param queueRecord write data unit
     */
    public void offer(SelectableChannel channel, AsyncQueueRecord queueRecord) {
        ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
        if (queue == null) {
            synchronized(channel) {
                queue = queueMap.get(channel);
                if (queue == null) {
                    queue = new ConcurrentLinkedQueue<AsyncQueueRecord>();
                    queueMap.put(channel, queue);
                }
            }
        }
        
        queue.offer(queueRecord);
    }
    
    /**
     * Get head element of <code>SelectableChannel</code> write queue.
     * Element will not be removed from queue.
     *
     * @param channel <code>SelectableChannel</code>
     *
     * @return <code>AsyncQueueRecord</code> write data unit
     */
    public AsyncQueueRecord peek(SelectableChannel channel) {
        ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
        if (queue != null) {
            return queue.peek();
        }
        
        return null;
    }

    /**
     * Get head element of <code>SelectableChannel</code> write queue.
     * Element will be removed from queue.
     *
     * @param channel <code>SelectableChannel</code>
     *
     * @return <code>AsyncQueueRecord</code> write data unit
     */
    public AsyncQueueRecord poll(SelectableChannel channel) {
        ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
        if (queue != null) {
            return queue.poll();
        }
        
        return null;
    }

    /**
     * Remove head element of <code>SelectableChannel</code> write queue.
     *
     * @param channel <code>SelectableChannel</code>
     */
    public void removeEntry(SelectableChannel channel) {
        queueMap.remove(channel);
    }

    /**
     * Get the size of <code>SelectableChannel</code> write queue.
     *
     * @param channel <code>SelectableChannel</code>
     * @return size of <code>SelectableChannel</code> write queue.
     */
    public int size(SelectableChannel channel) {
        ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
        return queue == null ? 0 : queue.size();
    }
    
    /**
     * <code>AsyncWriteQueue</code> data unit
     */
    public static class AsyncQueueRecord {
        public ByteBuffer byteBuffer;
        public AsyncWriteCallbackHandler callbackHandler;
        
        public AsyncQueueRecord(ByteBuffer byteBuffer,
                AsyncWriteCallbackHandler callbackHandler) {
            this.byteBuffer = byteBuffer;
            this.callbackHandler = callbackHandler;
        }
    }
}