dev@grizzly.java.net

Re: AsyncWriter proposal

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Wed, 31 Oct 2007 15:59:54 -0400

Salut,

first, great work!

Oleksiy Stashok wrote:
> Hello,
>
> here I would like to describe the initial propose I have for AsyncWriter
> implementation.
> Think it makes sense to associate AsyncWriter with SelectorHandler, so
> we could have good control on events happening with Channel.

+1

>
> There are 3 classes attached.
> AsyncWriter: This is the main class user will use to write data in
> async. mode. Class has several overloaded write methods (see javadoc for
> more details).
> AsyncWriteCallbackHandler: interface, which could be implemented by
> custom code to be notified, when Buffer will be async. written. Instance
> of AsyncWriteCallbackHandler could be passed as one of AsyncWriter.write
> parameters.
> AsyncWriteQueue: represents Chanel <-> WriteQueue correspondence
> collection.
>
> Classes are described more accurately in javadocs.
>
> As I mentioned, think AsyncWriter should be integrated to the
> SelectorHandler. So the way developer will be able to use its
> functionality is:
> selectorHandler.getAsyncWriter().write(byteBuffer...).

Or since SelectorHandler aren't "easily" available in a ProtocolFilter,
would it make sense to reach them from the Context instead. Since the
Context is near a 'Session' for us, I think it may make sense to have it
available on it.

Something like:

Context.getAsyncWriter() and
Context.getAsyncReader()

As part of your proposal I would like to to think about the AsyncReader
concept (at least have the API, and let the discussion open).

Or we can always add the AsyncWriter to the 'Session' (Context) attribute:

Context.getAttribute(ASYNC_WRITER);

I prefer the former API, but just in case we don't want to add more to
the Context API.

Now I think the SelectorHandler will need a little re-work to make sure
the AsyncWrite.onWrite is called, right?

Also to unify our CallbackHandler in grizzly, we might want to:

+ Make CallbackHandler extends AsyncWriter
+ Add a new marker interface that can be extended by both type of
CallbackHandler
+ Have two separates Handlers like you are proposing.

>
> Will appreciate any feedback! :)
>
> Thanks.
>
> WBR,
> Alexey.
>
> /*
> * The contents of this file are subject to the terms
> * of the Common Development and Distribution License
> * (the License). You may not use this file except in
> * compliance with the License.
> *
> * You can obtain a copy of the license at
> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> * glassfish/bootstrap/legal/CDDLv1.0.txt.
> * See the License for the specific language governing
> * permissions and limitations under the License.
> *
> * When distributing Covered Code, include this CDDL
> * Header Notice in each file and include the License file
> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> * If applicable, add the following below the CDDL Header,
> * with the fields enclosed by brackets [] replaced by
> * you own identifying information:
> * "Portions Copyrighted [year] [name of copyright owner]"
> *
> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> */
>
> package com.sun.grizzly;
>
> import java.io.IOException;
> import java.nio.ByteBuffer;
> import java.nio.channels.SelectableChannel;
> import java.nio.channels.SelectionKey;
> import java.nio.channels.WritableByteChannel;
>
> /**
> *
> * @author Alexey Stashok
> */
> public class AsyncWriter {
> private SelectorHandler selectorHandler;
> private AsyncWriteQueue writeQueue;
>
> public AsyncWriter(SelectorHandler selectorHandler) {
> this.selectorHandler = selectorHandler;
> writeQueue = new AsyncWriteQueue();
> }
>
> /**
> * Writes <code>ByteBuffer</code> to the <code>SelectableChannel</code>
> * First it tries to write <code>ByteBuffer</code> to the given <code>SelectableChannel</code>.
> * If, after write is called, <code>ByteBuffer</code> still has ready data to be written -
> * <code>ByteBuffer</code> will be added to <code>AsyncWriteQueue</code>
> * and <code>SelectableChannel<code> will be registered on
> * <code>SelectorHandler</code>, waiting for OP_WRITE event.

Here I would add:

* if an unexpected exception occurs, the exception will not be
propagated to the caller of this class, but logged by the Grizzly framework

We probably make it clear that without a CallBackHandler, any
IOException will be swallowed by the framework.


> *
> * @param channel <code>SelectableChannel</code> <code>ByteBuffer</code> should be written to
> * @param buffer <code>ByteBuffer</code>
> * @return true, if <code>ByteBuffer</code> was written completely, false if write operation was put to queue
> * @throws java.io.IOException
> */
> public boolean write(SelectableChannel channel, ByteBuffer buffer) throws IOException {
> return write(channel, buffer, null);
> }
>
> /**
> * Writes <code>ByteBuffer</code> to the <code>SelectableChannel</code>
> * First it tries to write <code>ByteBuffer</code> to the given
> * <code>SelectableChannel</code>.
> * If, after write is called, <code>ByteBuffer</code> still has ready
> * data to be written - <code>ByteBuffer</code> will be added to
> * <code>AsyncWriteQueue</code> and <code>SelectableChannel<code> will
> * be registered on <code>SelectorHandler</code>, waiting for OP_WRITE event.
> *
> * @param channel <code>SelectableChannel</code> <code>ByteBuffer</code>
> * should be written to
> * @param buffer <code>ByteBuffer</code>
> * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> * which will get notified, when
> * <code>ByteBuffer</code> will be completely written
> * @return true, if <code>ByteBuffer</code> was written completely,
> * false if write operation was put to queue
> * @throws java.io.IOException
> */
> public boolean write(SelectableChannel channel, ByteBuffer buffer,
> AsyncWriteCallbackHandler callbackHandler) throws IOException {
> return write(channel, buffer, callbackHandler, false);
> }
>
> /**
> * Writes <code>ByteBuffer</code> to the <code>SelectableChannel</code>
> * First it tries to write <code>ByteBuffer</code> to the given
> * <code>SelectableChannel</code>.
> * If, after write is called, <code>ByteBuffer</code> still has ready
> * data to be written - <code>ByteBuffer</code> will be added to
> * <code>AsyncWriteQueue</code> and <code>SelectableChannel<code> will
> * be registered on <code>SelectorHandler</code>, waiting for OP_WRITE event.
> *
> * @param channel <code>SelectableChannel</code> <code>ByteBuffer</code>
> * should be written to
> * @param buffer <code>ByteBuffer</code>
> * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> * which will get notified, when
> * <code>ByteBuffer</code> will be completely written
> * @param isCloneByteBuffer if true - AsyncWriter will clone given
> * <code>ByteBuffer</code> before puting it to the
> * <code>AsyncWriteQueue</code>
> * @return true, if <code>ByteBuffer</code> was written completely,
> * false if write operation was put to queue
> * @throws java.io.IOException
> */
> public boolean write(SelectableChannel channel, ByteBuffer buffer,
> AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) throws IOException {
>
> // If AsyncWriteQueue is empty - try to write ByteBuffer here
> if (!hasReadyAsyncWriteData(channel)) {
> ((WritableByteChannel) channel).write(buffer);
> }
>
> if (buffer.hasRemaining()) {
>
> // clone ByteBuffer if required
> if (isCloneByteBuffer) {
> int size = buffer.remaining();
> ByteBuffer newBuffer = buffer.isDirect() ?
> ByteBuffer.allocateDirect(size) :
> ByteBuffer.allocate(size);
>
> newBuffer.put(buffer);
> newBuffer.position(0);
> buffer = newBuffer;
> }
>
> // add new element to the queue
> writeQueue.offer(channel, new AsyncWriteQueue.AsyncQueueRecord(buffer, callbackHandler));
> selectorHandler.register(channel, SelectionKey.OP_WRITE);
>
> return false;
> }
>
> return true;
> }
>
> /**
> * Checks whether there is any data in <code>AsyncWriteQueue</code> ready
> * for the given <code>SelectableChannel</code>
> *
> * @param channel <code>SelectableChannel</code>
> * @return true, if there is ready data. False otherwise.
> */
> public boolean hasReadyAsyncWriteData(SelectableChannel channel) {
> return writeQueue.size(channel) > 0;
> }
>
> /**
> * Callback method, which should be called by <code>SelectorHandler</code> to
> * notify, that given <code>SelectableChannel</code> is ready to transmit data.
> *
> * @param channel <code>SelectableChannel</code>
> * @throws java.io.IOException
> */
> public void onWrite(SelectableChannel channel) throws IOException {
> AsyncWriteQueue.AsyncQueueRecord queueRecord = writeQueue.peek(channel);
> ByteBuffer byteBuffer = queueRecord.byteBuffer;
> ((WritableByteChannel) channel).write(byteBuffer);
> if (byteBuffer.hasRemaining()) {
> selectorHandler.register(channel, SelectionKey.OP_WRITE);



> } else {
> AsyncWriteQueue.AsyncQueueRecord removedQueueRecord = writeQueue.poll(channel);
> assert removedQueueRecord == queueRecord;
> if (removedQueueRecord.callbackHandler != null) {
> removedQueueRecord.callbackHandler.notifyWriteCompleted(channel, byteBuffer);
> }
> }
> }
>
> /**
> * Callback method, which should be called by <code>SelectorHandler</code> to
> * notify, that given <code>SelectableChannel</code> is going to be closed, so
> * related <code>SelectableChannel</code> data could be released from
> * <code>AsyncWriteQueue</code>
> *
> * @param channel <code>SelectableChannel</code>
> * @throws java.io.IOException
> */
> public void onClose(SelectableChannel channel) {
> writeQueue.removeEntry(channel);
> }
> }
>
>
>
> /*
> * The contents of this file are subject to the terms
> * of the Common Development and Distribution License
> * (the License). You may not use this file except in
> * compliance with the License.
> *
> * You can obtain a copy of the license at
> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> * glassfish/bootstrap/legal/CDDLv1.0.txt.
> * See the License for the specific language governing
> * permissions and limitations under the License.
> *
> * When distributing Covered Code, include this CDDL
> * Header Notice in each file and include the License file
> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> * If applicable, add the following below the CDDL Header,
> * with the fields enclosed by brackets [] replaced by
> * you own identifying information:
> * "Portions Copyrighted [year] [name of copyright owner]"
> *
> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> */
>
> package com.sun.grizzly;
>
> import java.nio.ByteBuffer;
> import java.nio.channels.SelectableChannel;
>
> /**
> * Callback handler interface, used by AsyncWriter to notify custom code about
> * completion of specific <code>ByteBuffer</code> writing.
> *
> * @author Alexey Stashok
> */
> public interface AsyncWriteCallbackHandler {
> public void notifyWriteCompleted(SelectableChannel channel, ByteBuffer buffer);
> }
>

I would recommend we add something like:

       public void notifyIOException(SelectableChannel channel,
ByteBuffer byteBuffer, Exception exception);

to the interface. Should we use instead (to follow the same pattern than
the current CallBackhandler):

    onWriteCompleted
    onIOException

?

>
>
> /*
> * The contents of this file are subject to the terms
> * of the Common Development and Distribution License
> * (the License). You may not use this file except in
> * compliance with the License.
> *
> * You can obtain a copy of the license at
> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> * glassfish/bootstrap/legal/CDDLv1.0.txt.
> * See the License for the specific language governing
> * permissions and limitations under the License.
> *
> * When distributing Covered Code, include this CDDL
> * Header Notice in each file and include the License file
> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> * If applicable, add the following below the CDDL Header,
> * with the fields enclosed by brackets [] replaced by
> * you own identifying information:
> * "Portions Copyrighted [year] [name of copyright owner]"
> *
> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> */
>
> package com.sun.grizzly;
>
> import java.nio.ByteBuffer;
> import java.nio.channels.SelectableChannel;
> import java.util.Map;
> import java.util.concurrent.ConcurrentHashMap;
> import java.util.concurrent.ConcurrentLinkedQueue;
>
> /**
> * Class represents collection of <code>SelectableChannel</code>s and
> * correspondent queue of <code>ByteBuffer</code>, which
> * should be written asynchronously.
> *
> * @author Alexey Stashok
> */
> public class AsyncWriteQueue {
> private Map<SelectableChannel, ConcurrentLinkedQueue<AsyncQueueRecord>> queueMap =
> new ConcurrentHashMap<SelectableChannel, ConcurrentLinkedQueue<AsyncQueueRecord>>();
>
>
> /**
> * Add data to the <code>AsyncWriteQueue</code>, corresponding to the given
> * <code>SelectableChannel</code>
> *
> * @param channel <code>SelectableChannel</code>
> * @param queueRecord write data unit
> */
> public void offer(SelectableChannel channel, AsyncQueueRecord queueRecord) {
> ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
> if (queue == null) {
> synchronized(channel) {

For later: Read/WriteLock might be more efficient here.

> queue = queueMap.get(channel);
> if (queue == null) {
> queue = new ConcurrentLinkedQueue<AsyncQueueRecord>();
> queueMap.put(channel, queue);
> }
> }
> }
>
> queue.offer(queueRecord);
> }
>
> /**
> * Get head element of <code>SelectableChannel</code> write queue.
> * Element will not be removed from queue.
> *
> * @param channel <code>SelectableChannel</code>
> *
> * @return <code>AsyncQueueRecord</code> write data unit
> */
> public AsyncQueueRecord peek(SelectableChannel channel) {
> ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
> if (queue != null) {
> return queue.peek();
> }
>
> return null;
> }
>
> /**
> * Get head element of <code>SelectableChannel</code> write queue.
> * Element will be removed from queue.
> *
> * @param channel <code>SelectableChannel</code>
> *
> * @return <code>AsyncQueueRecord</code> write data unit
> */
> public AsyncQueueRecord poll(SelectableChannel channel) {
> ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
> if (queue != null) {
> return queue.poll();
> }
>
> return null;
> }
>
> /**
> * Remove head element of <code>SelectableChannel</code> write queue.
> *
> * @param channel <code>SelectableChannel</code>
> */
> public void removeEntry(SelectableChannel channel) {
> queueMap.remove(channel);
> }
>
> /**
> * Get the size of <code>SelectableChannel</code> write queue.
> *
> * @param channel <code>SelectableChannel</code>
> * @return size of <code>SelectableChannel</code> write queue.
> */
> public int size(SelectableChannel channel) {
> ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
> return queue == null ? 0 : queue.size();
> }
>
> /**
> * <code>AsyncWriteQueue</code> data unit
> */
> public static class AsyncQueueRecord {

Should be at least protected?

> public ByteBuffer byteBuffer;
> public AsyncWriteCallbackHandler callbackHandler;
>
> public AsyncQueueRecord(ByteBuffer byteBuffer,
> AsyncWriteCallbackHandler callbackHandler) {
> this.byteBuffer = byteBuffer;
> this.callbackHandler = callbackHandler;
> }
> }
> }
>

Thanks!

-- Jeanfrancois

>
> ------------------------------------------------------------------------
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net