dev@grizzly.java.net

Re: AsyncWriter proposal

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Mon, 05 Nov 2007 23:49:00 +0100

I will choose 2nd one.

Thanks.

WBR,
Alexey.

charlie hunt wrote:
> Wrt context.execute() & context.execute(ContextTask) ... would it make
> sense to:
>
> 1.) Extend Context to an AsyncContext, or something like that where
> you'd add an AsyncContext.execute(ContextTask) ?
> 2.) Or, possibly add Context.execute(ContextTask) in addition to
> Context.execute() ?
>
> I think the former might be able to preserve compatibility. But, I'm
> not sure of the impact to the AsyncWriter?
>
> charlie ....
>
> Jeanfrancois Arcand wrote:
>>
>> Salut,
>>
>> Oleksiy Stashok wrote:
>>> Hello,
>>>
>>> Here is the 2nd try for AsyncQueueWriter. This time I have it
>>> integrated and tested.
>>> Attached AsyncWriteQueue.zip has implementation of AsyncWriteQueue;
>>> AsyncWriteQueue-complete.diff has complete svn diff for
>>> implementation and integration code.
>>>
>>> In diff file you can see refactoring I'm proposing for Context class.
>>> Currently context class has some logic, which is executed by
>>> Pipeline(ThreadPool): callback handler, protocol chain execution. I
>>> propose to move this logic to ContextTask interface implementations,
>>> this will simplify Context class. Because currently we have 2
>>> possibilities for Context: execute callback handler, protocol chain.
>>> After adding AsyncWriteQueue - 3 possibilities... Think it's too
>>> much to hold in Context?
>>
>> If the functionality is the same but better designed, I'm +1 :-)
>>
>>>
>>> I was thinking about async reading, looks like we can apply it for
>>> UDP protocol. (Discussion with Radim on users_at_grizzly mailing list).
>>> This way we can try to simulate some features, which TCP protocol
>>> has, for the server side.
>>
>> Not only TCP/UDP, but also Charlie have presented (not sure
>> implemented) last year an API where you can register a
>> key/byteBuffer/size and get notified only when all the bytes
>> requested are available (similar to what AIO/NIO.2 will do). I think
>> we should keep that idea in mind when designing that interface.
>>
>> Comments inline ;-) (almost until the end of the mails :-))
>>
>> I'm +1 for almost everything (great works!). The only -1 is the
>> Context.execute() swicth to Context.execute(ContextTask). We need to
>> keep backward compatibility with the current implementation because
>> Sailfin and (I suspect) the ORB will no longer works. I'm sure we can
>> find a way to implement your idea and still support that method.
>>
>> Thanks
>>
>> --Jeanfrancois
>>
>>
>>>
>>> Thanks.
>>>
>>> WBR,
>>> Alexey.
>>>
>> # This patch file was generated by NetBeans IDE
>> # Following Index: paths are relative to:
>> C:\Projects\Grizzly\trunk\modules\grizzly\src\main\java\com\sun\grizzly
>> # This patch can be applied using context Tools: Patch action on
>> respective folder.
>> # It uses platform neutral UTF-8 encoding and \n newlines.
>> # Above lines and this line are ignored by the patching process.
>> Index: async/AsyncQueueWritable.java
>> --- async/AsyncQueueWritable.java Locally New
>> +++ async/AsyncQueueWritable.java Locally New
>> @@ -0,0 +1,183 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import java.io.IOException;
>> +import java.net.SocketAddress;
>> +import java.nio.ByteBuffer;
>> +
>> +/**
>> + * Object, which is able to send <code>ByteBuffer</code> data
>> asynchronously,
>> + * using queue.
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public interface AsyncQueueWritable {
>> + /**
>> + * Method writes <code>ByteBuffer</code> using async write queue.
>> + * First, if write queue is empty - it tries to write
>> <code>ByteBuffer</code>
>> + * directly (without putting to the queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise it will be just logged by
>> + * Grizzly framework.
>> + *
>> + * @param buffer <code>ByteBuffer</code>
>> + * @return true, if <code>ByteBuffer</code> was written
>> completely, false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer) throws
>> IOException;
>> +
>> + /**
>> + * Method writes <code>ByteBuffer</code> using async write queue.
>> + * First, if write queue is empty - it tries to write
>> <code>ByteBuffer</code>
>> + * directly (without putting to the queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise, if the
>> <code>ByteBuffer</code> is
>> + * added to a writing queue - exception notification will come via
>> + * <code>AsyncWriteCallbackHandler</code>
>> + *
>> + * @param buffer <code>ByteBuffer</code>
>> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> + * which will get notified, when
>> + * <code>ByteBuffer</code> will be
>> completely written
>> + * @return true, if <code>ByteBuffer</code> was written completely,
>> + * false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException;
>> +
>> + /**
>> + * Method writes <code>ByteBuffer</code> using async write queue.
>> + * First, if write queue is empty - it tries to write
>> <code>ByteBuffer</code>
>> + * directly (without putting to the queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise, if the
>> <code>ByteBuffer</code> is
>> + * added to a writing queue - exception notification will come via
>> + * <code>AsyncWriteCallbackHandler</code>
>> + *
>> + * @param buffer <code>ByteBuffer</code>
>> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> + * which will get notified, when
>> + * <code>ByteBuffer</code> will be
>> completely written
>> + * @param isCloneByteBuffer if true - <code>AsyncQueueWriter</code>
>> + * will clone given
>> + * <code>ByteBuffer</code> before
>> puting it to the
>> + * <code>AsyncWriteQueue</code>
>> + * @return true, if <code>ByteBuffer</code> was written completely,
>> + * false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler,
>> + boolean isCloneByteBuffer) throws IOException;
>> +
>> + /**
>> + * Method sends <code>ByteBuffer</code> using async write queue.
>> + * First, if write queue is empty - it tries to send
>> <code>ByteBuffer</code>
>> + * to the given <code>SocketAddress</code> directly
>> + * (without putting to the queue).
>> + * If associated write queue is not empty or after direct sending
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise it will be just logged by
>> + * Grizzly framework.
>> + *
>> + * @param dstAddress destination <code>SocketAddress</code> data
>> will
>> + * be sent to
>> + * @param buffer <code>ByteBuffer</code>
>> + * @return true, if <code>ByteBuffer</code> was written
>> completely, false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer)
>> + throws IOException;
>> +
>> + /**
>> + * Method sends <code>ByteBuffer</code> using async write queue.
>> + * First, if write queue is empty - it tries to send
>> <code>ByteBuffer</code>
>> + * to the given <code>SocketAddress</code> directly
>> + * (without putting to the queue).
>> + * If associated write queue is not empty or after direct sending
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise, if the
>> <code>ByteBuffer</code> is
>> + * added to a writing queue - exception notification will come via
>> + * <code>AsyncWriteCallbackHandler</code>
>> + *
>> + * @param dstAddress destination <code>SocketAddress</code> data
>> will
>> + * be sent to
>> + * @param buffer <code>ByteBuffer</code>
>> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> + * which will get notified, when
>> + * <code>ByteBuffer</code> will be
>> completely written
>> + * @return true, if <code>ByteBuffer</code> was written completely,
>> + * false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException;
>> +
>> + /**
>> + * Method sends <code>ByteBuffer</code> using async write queue.
>> + * First, if write queue is empty - it tries to send
>> <code>ByteBuffer</code>
>> + * to the given <code>SocketAddress</code> directly
>> + * (without putting to the queue).
>> + * If associated write queue is not empty or after direct sending
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise, if the
>> <code>ByteBuffer</code> is
>> + * added to a writing queue - exception notification will come via
>> + * <code>AsyncWriteCallbackHandler</code>
>> + *
>> + * @param dstAddress destination <code>SocketAddress</code> data
>> will
>> + * be sent to
>> + * @param buffer <code>ByteBuffer</code>
>> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> + * which will get notified, when
>> + * <code>ByteBuffer</code> will be
>> completely written
>> + * @param isCloneByteBuffer if true - <code>AsyncQueueWriter</code>
>> + * will clone given
>> + * <code>ByteBuffer</code> before
>> puting it to the
>> + * <code>AsyncWriteQueue</code>
>> + * @return true, if <code>ByteBuffer</code> was written completely,
>> + * false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler,
>> + boolean isCloneByteBuffer) throws IOException;
>> +}
>> Index: async/AsyncQueueWriter.java
>> --- async/AsyncQueueWriter.java Locally New
>> +++ async/AsyncQueueWriter.java Locally New
>> @@ -0,0 +1,247 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import java.io.IOException;
>> +import java.net.SocketAddress;
>> +import java.nio.ByteBuffer;
>> +import java.nio.channels.SelectableChannel;
>> +import java.nio.channels.SelectionKey;
>> +
>> +/**
>> + * Common inteface to be implemented by protocol dependant
>> asynchronous queue
>> + * writers implementations
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public interface AsyncQueueWriter {
>> + /**
>> + * Method writes <code>ByteBuffer</code> to the
>> <code>SelectableChannel</code>
>> + * First, if <code>SelectableChannel</code> associated write
>> queue is empty -
>> + * it tries to write <code>ByteBuffer</code> to the given
>> + * <code>SelectableChannel</code> directly (without putting to
>> the queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>
>> + * and <code>SelectableChannel<code> will be registered on
>> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise it will be just logged by
>> + * Grizzly framework.
>> + *
>> + * @param key <code>SelectionKey</code> associated with
>> + * <code>SelectableChannel</code>
>> <code>ByteBuffer</code>
>> + * should be written to
>> + * @param buffer <code>ByteBuffer</code>
>> + * @return true, if <code>ByteBuffer</code> was written
>> completely, false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
>> IOException;
>> +
>> + /**
>> + * Method writes <code>ByteBuffer</code> to the
>> <code>SelectableChannel</code>
>> + * First, if <code>SelectableChannel</code> associated write
>> queue is empty -
>> + * it tries to write <code>ByteBuffer</code> to the given
>> + * <code>SelectableChannel</code> directly (without putting to
>> the queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>
>> + * and <code>SelectableChannel<code> will be registered on
>> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>>
>> Typo propagated
>>
>> + * to the caller directly, otherwise, if the
>> <code>ByteBuffer</code> is
>> + * added to a writing queue - exception notification will come via
>> + * <code>AsyncWriteCallbackHandler</code>
>>
>> add the AsyncWriteCallbackHandler.onIOException()
>>
>> + *
>> + * @param key <code>SelectionKey</code> associated with
>> + * <code>SelectableChannel</code>
>> <code>ByteBuffer</code>
>> + * should be written to
>> + * @param buffer <code>ByteBuffer</code>
>> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> + * which will get notified, when
>> + * <code>ByteBuffer</code> will be
>> completely written
>> + * @return true, if <code>ByteBuffer</code> was written completely,
>> + * false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException;
>> +
>> + /**
>> + * Method writes <code>ByteBuffer</code> to the
>> <code>SelectableChannel</code>
>> + * First, if <code>SelectableChannel</code> associated write
>> queue is empty -
>> + * it tries to write <code>ByteBuffer</code> to the given
>> + * <code>SelectableChannel</code> directly (without putting to
>> the queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>
>> + * and <code>SelectableChannel<code> will be registered on
>> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise, if the
>> <code>ByteBuffer</code> is
>> + * added to a writing queue - exception notification will come via
>> + * <code>AsyncWriteCallbackHandler</code>
>> + *
>> + * @param key <code>SelectionKey</code> associated with
>> + * <code>SelectableChannel</code>
>> <code>ByteBuffer</code>
>> + * should be written to
>> + * @param buffer <code>ByteBuffer</code>
>> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> + * which will get notified, when
>> + * <code>ByteBuffer</code> will be
>> completely written
>> + * @param isCloneByteBuffer if true -
>> <code>AsyncQueueWriter</code> will
>> + * clone given
>> + * <code>ByteBuffer</code> before
>> puting it to the
>> + * <code>AsyncWriteQueue</code>
>> + * @return true, if <code>ByteBuffer</code> was written completely,
>> + * false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer)
>> + throws IOException;
>> +
>> + /**
>> + * Method sends <code>ByteBuffer</code> to the
>> <code>SocketAddress</code>
>> + * First, if <code>SelectableChannel</code> associated write
>> queue is empty -
>> + * it tries to write <code>ByteBuffer</code> to the given
>> + * <code>SocketAddress</code> directly (without putting to the
>> queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>
>> + * and <code>SelectableChannel<code> will be registered on
>> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise it will be just logged by
>> + * Grizzly framework.
>> + *
>> + * @param key <code>SelectionKey</code> associated with
>> + * <code>SelectableChannel</code>, which will be used to
>> + * send<code>ByteBuffer</code> to
>> + * @param dstAddress destination address <code>ByteBuffer</code>
>> will be sent to
>> + * @param buffer <code>ByteBuffer</code>
>> + * @return true, if <code>ByteBuffer</code> was written
>> completely, false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> + ByteBuffer buffer) throws IOException;
>> +
>> + /**
>> + * Method sends <code>ByteBuffer</code> to the
>> <code>SocketAddress</code>
>> + * First, if <code>SelectableChannel</code> associated write
>> queue is empty -
>> + * it tries to write <code>ByteBuffer</code> to the given
>> + * <code>SocketAddress</code> directly (without putting to the
>> queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>
>> + * and <code>SelectableChannel<code> will be registered on
>> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise, if the
>> <code>ByteBuffer</code> is
>> + * added to a writing queue - exception notification will come via
>> + * <code>AsyncWriteCallbackHandler</code>
>> + *
>> + * @param key <code>SelectionKey</code> associated with
>> + * <code>SelectableChannel</code>
>> <code>ByteBuffer</code>
>> + * should be written to
>> + * @param dstAddress destination address <code>ByteBuffer</code>
>> will be sent to
>> + * @param buffer <code>ByteBuffer</code>
>> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> + * which will get notified, when
>> + * <code>ByteBuffer</code> will be
>> completely written
>> + * @return true, if <code>ByteBuffer</code> was written completely,
>> + * false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> + ByteBuffer buffer, AsyncWriteCallbackHandler
>> callbackHandler)
>> + throws IOException;
>> +
>> + /**
>> + * Method sends <code>ByteBuffer</code> to the
>> <code>SocketAddress</code>
>> + * First, if <code>SelectableChannel</code> associated write
>> queue is empty -
>> + * it tries to write <code>ByteBuffer</code> to the given
>> + * <code>SocketAddress</code> directly (without putting to the
>> queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>
>> + * and <code>SelectableChannel<code> will be registered on
>> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>> + * to the caller directly, otherwise, if the
>> <code>ByteBuffer</code> is
>> + * added to a writing queue - exception notification will come via
>> + * <code>AsyncWriteCallbackHandler</code>
>> + *
>> + * @param key <code>SelectionKey</code> associated with
>> + * <code>SelectableChannel</code>
>> <code>ByteBuffer</code>
>> + * should be written to
>> + * @param dstAddress destination address <code>ByteBuffer</code>
>> will be sent to
>> + * @param buffer <code>ByteBuffer</code>
>> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> + * which will get notified, when
>> + * <code>ByteBuffer</code> will be
>> completely written
>> + * @param isCloneByteBuffer if true -
>> <code>AsyncQueueWriter</code> will
>> + * clone given
>> + * <code>ByteBuffer</code> before
>> puting it to the
>> + * <code>AsyncWriteQueue</code>
>> + * @return true, if <code>ByteBuffer</code> was written completely,
>> + * false if write operation was put to queue
>> + * @throws java.io.IOException
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> + ByteBuffer buffer, AsyncWriteCallbackHandler
>> callbackHandler,
>> + boolean isCloneByteBuffer) throws IOException;
>> +
>> + /**
>> + * Checks whether there is any data in
>> <code>AsyncWriteQueue</code> ready
>> + * for the given <code>SelectableChannel</code>
>> + *
>> + * @param key <code>SelectionKey</code> associated with
>> <code>SelectableChannel</code>
>> + * @return true, if there is ready data. False otherwise.
>> + */
>> + public boolean hasReadyAsyncWriteData(SelectionKey key);
>> +
>> + /**
>> + * Callback method, which should be called by
>> <code>SelectorHandler</code> to
>> + * notify, that given <code>SelectableChannel</code> is ready to
>> transmit data.
>> + *
>> + * @param key <code>SelectionKey</code> associated with
>> <code>SelectableChannel</code>
>> + * @throws java.io.IOException
>> + */
>> + public void onWrite(SelectionKey key) throws IOException;
>> +
>> + /**
>> + * Callback method, which should be called by
>> <code>SelectorHandler</code> to
>> + * notify, that given <code>SelectableChannel</code> is going to
>> be closed, so
>> + * related <code>SelectableChannel</code> data could be released
>> from
>> + * <code>AsyncWriteQueue</code>
>> + *
>> + * @param <code>SelectableChannel</code>
>> + * @throws java.io.IOException
>> + */
>> + public void onClose(SelectableChannel channel);
>> +
>> + public void close();
>> +
>> +}
>> Index: async/AsyncQueueWriterContextTask.java
>> --- async/AsyncQueueWriterContextTask.java Locally New
>> +++ async/AsyncQueueWriterContextTask.java Locally New
>> @@ -0,0 +1,78 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import com.sun.grizzly.*;
>>
>> Fix the '*' by listing the classes instead (Netbeans Netbeans
>> Netbeans ;-))
>>
>> +
>> +/**
>> + * <code>AsyncQueueWriter</code> task, which will be executed by
>> + * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
>> + * is called.
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class AsyncQueueWriterContextTask extends ContextTask {
>> + private static final TaskPool<AsyncQueueWriterContextTask>
>> taskPool =
>> + new TaskPool<AsyncQueueWriterContextTask>() {
>> + @Override
>> + public AsyncQueueWriterContextTask newTask() {
>> + return new AsyncQueueWriterContextTask();
>> + }
>> + };
>> +
>> + private AsyncQueueWriter asyncQueueWriter;
>> +
>> + public static AsyncQueueWriterContextTask poll() {
>> + return taskPool.poll();
>> + }
>> +
>> + public static void offer(AsyncQueueWriterContextTask contextTask) {
>> + contextTask.recycle();
>> + taskPool.offer(contextTask);
>> + }
>> +
>> + public Object call() throws Exception {
>> + try {
>> + asyncQueueWriter.onWrite(context.getSelectionKey());
>>
>> Can you make sure the getSelectionKey() is not null, and return an
>> IllegalStateException in case the Key is null? Just to make the code
>> robust, event if the key cannot be null.
>>
>>
>> + } finally {
>> + offer(this);
>> + }
>> +
>> + return null;
>> + }
>> +
>> + public AsyncQueueWriter getAsyncQueueWriter() {
>> + return asyncQueueWriter;
>> + }
>> +
>> + public void setAsyncQueueWriter(AsyncQueueWriter asyncWriter) {
>> + this.asyncQueueWriter = asyncWriter;
>> + }
>> +
>> + @Override
>> + public void recycle() {
>> + asyncQueueWriter = null;
>> + super.recycle();
>> + }
>> +}
>> Index: async/AsyncWriteCallbackHandler.java
>> --- async/AsyncWriteCallbackHandler.java Locally New
>> +++ async/AsyncWriteCallbackHandler.java Locally New
>> @@ -0,0 +1,38 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import java.nio.ByteBuffer;
>> +import java.nio.channels.SelectionKey;
>> +
>> +/**
>> + * Callback handler interface, used by <code>AsyncQueueWriter</code>
>> to notify
>> + * custom code about completion of specific <code>ByteBuffer</code>
>> writing.
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public interface AsyncWriteCallbackHandler {
>> + public void onWriteCompleted(SelectionKey key, ByteBuffer buffer);
>> + public void onIOException(SelectionKey key, ByteBuffer buffer);
>> +}
>>
>> The API looks good, but I would add more docs here as this interface
>> will be implemented by the user of Grizzly, and since we are lacking
>> in term of tutorial, better to have good docs :-)
>>
>>
>> Index: async/AsyncWriteQueue.java
>> --- async/AsyncWriteQueue.java Locally New
>> +++ async/AsyncWriteQueue.java Locally New
>> @@ -0,0 +1,144 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import java.nio.channels.SelectableChannel;
>> +import java.util.Map;
>> +import java.util.concurrent.ConcurrentHashMap;
>> +import java.util.concurrent.ConcurrentLinkedQueue;
>> +import java.util.concurrent.locks.ReentrantLock;
>> +
>> +/**
>> + * Class represents collection of
>> + * <code>SelectableChannel</code>s and correspondent queue of
>> + * <code>ByteBuffer</code>, which should be written asynchronously.
>> + * This implementation is TCP protocol specific.
>>
>> To follow our convention, should this class be called
>> TCPAsyncWriteQueue?
>>
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class AsyncWriteQueue<E>{
>> + private Map<SelectableChannel, ChannelAsyncWriteEntry> queueMap =
>> + new ConcurrentHashMap<SelectableChannel,
>> ChannelAsyncWriteEntry>();
>> +
>> + /**
>> + * Add data to the <code>AsyncWriteQueue</code>, corresponding
>> to the given
>> + * <code>SelectableChannel</code>
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + * @param queueRecord write data unit
>> + */
>> + public void offer(SelectableChannel channel, E queueRecord) {
>> + ChannelAsyncWriteEntry entry =
>> obtainChannelAsyncWriteEntry(channel);
>> + entry.queue.offer(queueRecord);
>> + }
>> +
>> + /**
>> + * Get head element of <code>SelectableChannel</code> write queue.
>> + * Element will not be removed from queue.
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + *
>> + * @return <code>AsyncQueueRecord</code> write data unit
>> + */
>> + public E peek(SelectableChannel channel) {
>> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
>> + if (entry != null) {
>> + return entry.queue.peek();
>> + }
>> +
>> + return null;
>> + }
>> +
>> + /**
>> + * Get head element of <code>SelectableChannel</code> write queue.
>> + * Element will be removed from queue.
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + *
>> + * @return <code>AsyncQueueRecord</code> write data unit
>> + */
>> + public E poll(SelectableChannel channel) {
>> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
>> + if (entry != null) {
>> + return entry.queue.poll();
>> + }
>> +
>> + return null;
>> + }
>> +
>> + /**
>> + * Remove head element of <code>SelectableChannel</code> write
>> queue.
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + */
>> + public void removeEntry(SelectableChannel channel) {
>> + queueMap.remove(channel);
>> + }
>> +
>> + /**
>> + * Get the size of <code>SelectableChannel</code> write queue.
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + * @return size of <code>SelectableChannel</code> write queue.
>> + */
>> + public int size(SelectableChannel channel) {
>> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
>> + return entry == null ? 0 : entry.queue.size();
>> + }
>> +
>> + public void clear() {
>> + queueMap.clear();
>> + }
>> +
>> + protected ChannelAsyncWriteEntry
>> obtainChannelAsyncWriteEntry(SelectableChannel channel) {
>> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
>> + if (entry == null) {
>> + synchronized(channel) {
>>
>> Minor: A ReentrantReadWriteLock might perform better than a Sync here
>>
>> + entry = queueMap.get(channel);
>> + if (entry == null) {
>> + entry = new ChannelAsyncWriteEntry();
>> + queueMap.put(channel, entry);
>> + }
>> + }
>> + }
>> + return entry;
>> + }
>> +
>> + /**
>> + * <code>AsyncWriteQueue</code> data unit
>> + */
>> + protected class ChannelAsyncWriteEntry {
>> + public ConcurrentLinkedQueue<E> queue;
>> + public ReentrantLock writeLock;
>> + public ReentrantLock onWriteLock;
>> + public ReentrantLock updateLock;
>> +
>> + public ChannelAsyncWriteEntry() {
>> + queue = new ConcurrentLinkedQueue<E>();
>> + writeLock = new ReentrantLock();
>> + onWriteLock = new ReentrantLock();
>> + updateLock = new ReentrantLock();
>> + }
>> + }
>> +}
>> \ No newline at end of file
>> Index: async/TCPAsyncQueueWriter.java
>> --- async/TCPAsyncQueueWriter.java Locally New
>> +++ async/TCPAsyncQueueWriter.java Locally New
>> @@ -0,0 +1,255 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import com.sun.grizzly.*;
>> +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
>> +import java.io.IOException;
>> +import java.net.SocketAddress;
>> +import java.nio.ByteBuffer;
>> +import java.nio.channels.SelectableChannel;
>> +import java.nio.channels.SelectionKey;
>> +import java.nio.channels.WritableByteChannel;
>> +import java.util.concurrent.ConcurrentLinkedQueue;
>> +
>> +/**
>> + * TCP implementation of <code>AsyncQueueWriter</code>
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class TCPAsyncQueueWriter implements AsyncQueueWriter {
>> + private SelectorHandler selectorHandler;
>> + private AsyncWriteQueue<TCPAsyncQueueRecord> writeQueue;
>> +
>> + public TCPAsyncQueueWriter(SelectorHandler selectorHandler) {
>> + this.selectorHandler = selectorHandler;
>> + writeQueue = new AsyncWriteQueue<TCPAsyncQueueRecord>();
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
>> IOException {
>> + return write(key, buffer, null);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return write(key, buffer, callbackHandler, false);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer) throws IOException {
>> +
>> + SelectableChannel channel = key.channel();
>> + // If AsyncWriteQueue is empty - try to write ByteBuffer here
>> + ChannelAsyncWriteEntry channelEntry =
>> + writeQueue.obtainChannelAsyncWriteEntry(channel);
>> +
>> + ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
>> channelEntry.queue;
>> +
>> + boolean hasWriteLock = false;
>> +
>> + try {
>> + if (queue.isEmpty() &&
>> + (hasWriteLock =
>> channelEntry.writeLock.tryLock())) {
>> + ((WritableByteChannel) channel).write(buffer);
>> + }
>> +
>> + if (buffer.hasRemaining()) {
>> + // clone ByteBuffer if required
>> + if (isCloneByteBuffer) {
>> + int size = buffer.remaining();
>> + ByteBuffer newBuffer = buffer.isDirect() ?
>> + ByteBuffer.allocateDirect(size) :
>> + ByteBuffer.allocate(size);
>>
>> Here we need to support VIEW and HEAP_ARRAY like the remaining of the
>> framework allow. We don't need it for the first release, but might be
>> good to file an RFE. I would think an application that use
>> ByteBuffer.wrap() would like to use the same type of bb when writing.
>>
>> +
>> + newBuffer.put(buffer);
>> + newBuffer.position(0);
>> + buffer = newBuffer;
>> + }
>> +
>> + // add new element to the queue
>> + channelEntry.updateLock.lock();
>> + queue.offer(new TCPAsyncQueueRecord(buffer,
>> + callbackHandler));
>>
>> Here should we try to cache TCPAsyncQueueRecord instead of calling
>> 'new' for every write? I would think yes :-)
>>
>> +
>> + /*
>> + * This check helps to avoid not required key
>> registering
>> + */
>> + if (!channelEntry.onWriteLock.isLocked()) {
>> + registerForWriting(key);
>> + }
>> +
>> + channelEntry.updateLock.unlock();
>> +
>> + return false;
>> + }
>> + } finally {
>> + if (hasWriteLock) {
>> + channelEntry.writeLock.unlock();
>> + }
>> + }
>> +
>> + return true;
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean hasReadyAsyncWriteData(SelectionKey key) {
>> + return writeQueue.size(key.channel()) > 0;
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void onWrite(SelectionKey key) throws IOException {
>> + SelectableChannel channel = key.channel();
>> +
>> + ChannelAsyncWriteEntry channelEntry =
>> + writeQueue.obtainChannelAsyncWriteEntry(channel);
>> +
>> + boolean hasUpdateLock = false;
>> + boolean hasOnWriteLock = false;
>> +
>> + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
>> + try {
>> + ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
>> channelEntry.queue;
>> +
>> + while (queue.size() > 0) {
>> + if (hasUpdateLock) {
>> + channelEntry.updateLock.unlock();
>> + hasUpdateLock = false;
>> + }
>> +
>> + TCPAsyncQueueRecord queueRecord = queue.peek();
>> +
>> + ByteBuffer byteBuffer = queueRecord.byteBuffer;
>> + try {
>> + ((WritableByteChannel)
>> channel).write(byteBuffer);
>> + } catch (IOException e) {
>> + if (queueRecord.callbackHandler != null) {
>> +
>> queueRecord.callbackHandler.onIOException(key,
>> + byteBuffer);
>> + }
>>
>> else log the exception.
>>
>> + }
>> +
>> + if (!byteBuffer.hasRemaining()) {
>> + TCPAsyncQueueRecord removedQueueRecord =
>> + queue.poll();
>> + assert removedQueueRecord == queueRecord;
>> +
>> + if (removedQueueRecord.callbackHandler !=
>> null) {
>> + removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
>> + }
>> + } else {
>> + hasOnWriteLock = false;
>> + channelEntry.onWriteLock.unlock();
>> + registerForWriting(key);
>> + break;
>> + }
>> +
>> + if (queue.size() == 0) {
>> + /*
>> + * If queue is empty - there is possibility
>> that write method
>> + * will add entry, but it will not be
>> processed, because
>> + * onWriteLock is currently locked
>> + */
>> + channelEntry.updateLock.lock();
>> + hasUpdateLock = true;
>> + }
>> + }
>> + } finally {
>> + if (hasOnWriteLock) {
>> + channelEntry.onWriteLock.unlock();
>> + }
>> +
>> + if (hasUpdateLock) {
>> + channelEntry.updateLock.unlock();
>> + }
>> + }
>> + }
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> ByteBuffer buffer) throws IOException {
>> + throw new UnsupportedOperationException("Not supported for
>> TCP transport.");
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + throw new UnsupportedOperationException("Not supported for
>> TCP transport.");
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer) throws IOException {
>> + throw new UnsupportedOperationException("Not supported for
>> TCP transport.");
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void onClose(SelectableChannel channel) {
>> + writeQueue.removeEntry(channel);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void close() {
>> + writeQueue.clear();
>> + writeQueue = null;
>> + }
>> +
>> + private void registerForWriting(SelectionKey key) {
>> + selectorHandler.register(key, SelectionKey.OP_WRITE);
>> + }
>> +
>> + /**
>> + * <code>AsyncWriteQueue</code> data unit specific for TCP protocol
>> + */
>> + protected static class TCPAsyncQueueRecord {
>> + public ByteBuffer byteBuffer;
>> + public AsyncWriteCallbackHandler callbackHandler;
>> +
>> + public TCPAsyncQueueRecord(ByteBuffer byteBuffer,
>> + AsyncWriteCallbackHandler callbackHandler) {
>> + this.byteBuffer = byteBuffer;
>> + this.callbackHandler = callbackHandler;
>> + }
>> + }
>> +}
>> Index: async/UDPAsyncQueueWriter.java
>> --- async/UDPAsyncQueueWriter.java Locally New
>> +++ async/UDPAsyncQueueWriter.java Locally New
>> @@ -0,0 +1,271 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import com.sun.grizzly.*;
>> +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
>> +import java.io.IOException;
>> +import java.nio.ByteBuffer;
>> +import java.nio.channels.SelectableChannel;
>> +import java.nio.channels.SelectionKey;
>> +import java.nio.channels.WritableByteChannel;
>> +import java.util.concurrent.ConcurrentLinkedQueue;
>> +import java.net.SocketAddress;
>> +import java.nio.channels.DatagramChannel;
>> +
>> +/**
>> + * UDP implementation of <code>AsyncQueueWriter</code>
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class UDPAsyncQueueWriter implements AsyncQueueWriter {
>> + private SelectorHandler selectorHandler;
>> + private AsyncWriteQueue<UDPAsyncQueueRecord> writeQueue;
>> +
>> + public UDPAsyncQueueWriter(SelectorHandler selectorHandler) {
>> + this.selectorHandler = selectorHandler;
>> + writeQueue = new AsyncWriteQueue<UDPAsyncQueueRecord>();
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
>> IOException {
>> + return write(key, null, buffer, null);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return write(key, null, buffer, callbackHandler, false);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer) throws IOException {
>> + return write(key, null, buffer, callbackHandler,
>> isCloneByteBuffer);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> + ByteBuffer buffer) throws IOException {
>> + return write(key, dstAddress, buffer, null);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> + ByteBuffer buffer, AsyncWriteCallbackHandler
>> callbackHandler)
>> + throws IOException {
>> + return write(key, dstAddress, buffer, callbackHandler, false);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> + ByteBuffer buffer, AsyncWriteCallbackHandler
>> callbackHandler,
>> + boolean isCloneByteBuffer) throws IOException {
>> +
>> + SelectableChannel channel = key.channel();
>> + // If AsyncWriteQueue is empty - try to write ByteBuffer here
>> + ChannelAsyncWriteEntry channelEntry =
>> + writeQueue.obtainChannelAsyncWriteEntry(channel);
>> +
>> + ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
>> channelEntry.queue;
>> +
>> + boolean hasWriteLock = false;
>> +
>> + try {
>> + if (queue.isEmpty() &&
>> + (hasWriteLock =
>> channelEntry.writeLock.tryLock())) {
>> + doWrite((WritableByteChannel) channel, dstAddress,
>> buffer);
>> + }
>> +
>> + if (buffer.hasRemaining()) {
>> + // clone ByteBuffer if required
>> + if (isCloneByteBuffer) {
>> + int size = buffer.remaining();
>> + ByteBuffer newBuffer = buffer.isDirect() ?
>> + ByteBuffer.allocateDirect(size) :
>> + ByteBuffer.allocate(size);
>> +
>> + newBuffer.put(buffer);
>> + newBuffer.position(0);
>> + buffer = newBuffer;
>> + }
>>
>> Same as above. Support HEAP and VIEW_HEAP/DIRECT via ByteBufferFactory
>>
>> +
>> + // add new element to the queue
>> + channelEntry.updateLock.lock();
>> + queue.offer(new UDPAsyncQueueRecord(dstAddress, buffer,
>> + callbackHandler));
>> +
>> + /*
>> + * This check helps to avoid not required key
>> registering
>> + */
>> + if (!channelEntry.onWriteLock.isLocked()) {
>> + registerForWriting(key);
>> + }
>> +
>> + channelEntry.updateLock.unlock();
>> +
>> + return false;
>> + }
>> + } finally {
>> + if (hasWriteLock) {
>> + channelEntry.writeLock.unlock();
>> + }
>> + }
>> +
>> + return true;
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean hasReadyAsyncWriteData(SelectionKey key) {
>> + return writeQueue.size(key.channel()) > 0;
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void onWrite(SelectionKey key) throws IOException {
>> + SelectableChannel channel = key.channel();
>> +
>> + ChannelAsyncWriteEntry channelEntry =
>> + writeQueue.obtainChannelAsyncWriteEntry(channel);
>> +
>> + boolean hasUpdateLock = false;
>> + boolean hasOnWriteLock = false;
>> +
>> + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
>> + try {
>> + ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
>> + channelEntry.queue;
>> +
>> + while (queue.size() > 0) {
>> + if (hasUpdateLock) {
>> + channelEntry.updateLock.unlock();
>> + hasUpdateLock = false;
>> + }
>> +
>> + UDPAsyncQueueRecord queueRecord = queue.peek();
>> +
>> + ByteBuffer byteBuffer = queueRecord.byteBuffer;
>> + try {
>> + doWrite((WritableByteChannel) channel,
>> + queueRecord.dstAddress, byteBuffer);
>> + } catch (IOException e) {
>> + if (queueRecord.callbackHandler != null) {
>> +
>> queueRecord.callbackHandler.onIOException(key,
>> + byteBuffer);
>> + }
>> + }
>>
>> else log the exception.
>>
>> +
>> + if (!byteBuffer.hasRemaining()) {
>> + UDPAsyncQueueRecord removedQueueRecord =
>> queue.poll();
>> + assert removedQueueRecord == queueRecord;
>> +
>> + if (removedQueueRecord.callbackHandler !=
>> null) {
>> + removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
>> + }
>> + } else {
>> + hasOnWriteLock = false;
>> + channelEntry.onWriteLock.unlock();
>> + registerForWriting(key);
>> + break;
>> + }
>> +
>> + if (queue.size() == 0) {
>> + /*
>> + * If queue is empty - there is possibility
>> that write method
>> + * will add entry, but it will not be
>> processed, because
>> + * onWriteLock is currently locked
>> + */
>> + channelEntry.updateLock.lock();
>> + hasUpdateLock = true;
>> + }
>> + }
>> + } finally {
>> + if (hasOnWriteLock) {
>> + channelEntry.onWriteLock.unlock();
>> + }
>> +
>> + if (hasUpdateLock) {
>> + channelEntry.updateLock.unlock();
>> + }
>> + }
>> + }
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void onClose(SelectableChannel channel) {
>> + writeQueue.removeEntry(channel);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void close() {
>> + writeQueue.clear();
>> + writeQueue = null;
>> + }
>> +
>> + private void doWrite(WritableByteChannel channel, SocketAddress
>> dstAddress,
>> + ByteBuffer byteBuffer) throws IOException {
>> + if (dstAddress != null) {
>> + ((DatagramChannel) channel).send(byteBuffer, dstAddress);
>> + } else {
>> + channel.write(byteBuffer);
>> + }
>> + }
>> +
>> + private void registerForWriting(SelectionKey key) {
>> + selectorHandler.register(key, SelectionKey.OP_WRITE);
>> + }
>> +
>> + /**
>> + * <code>AsyncWriteQueue</code> data unit specific for TCP protocol
>> + */
>> + protected static class UDPAsyncQueueRecord extends
>> + TCPAsyncQueueWriter.TCPAsyncQueueRecord {
>> + public SocketAddress dstAddress;
>> +
>> + public UDPAsyncQueueRecord(SocketAddress dstAddress,
>> ByteBuffer byteBuffer,
>> + AsyncWriteCallbackHandler callbackHandler) {
>> + super(byteBuffer, callbackHandler);
>> + this.dstAddress = dstAddress;
>> + }
>> + }
>> +}
>> Index: CallbackHandlerContextTask.java
>> --- CallbackHandlerContextTask.java Locally New
>> +++ CallbackHandlerContextTask.java Locally New
>> @@ -0,0 +1,98 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly;
>> +
>> +import com.sun.grizzly.Context.OpType;
>> +import com.sun.grizzly.ContextTask.TaskPool;
>> +
>> +/**
>> + * <code>CallbackHandler</code> task, which will be executed by
>> + * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
>> + * is called.
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class CallbackHandlerContextTask extends ContextTask {
>> + private static final TaskPool<CallbackHandlerContextTask>
>> taskPool =
>> + new TaskPool<CallbackHandlerContextTask>() {
>> + @Override
>> + public CallbackHandlerContextTask newTask() {
>> + return new CallbackHandlerContextTask();
>> + }
>> + };
>> +
>> + private CallbackHandler callBackHandler;
>> +
>> + public static CallbackHandlerContextTask poll() {
>> + return taskPool.poll();
>> + }
>> +
>> + public static void offer(CallbackHandlerContextTask contextTask) {
>> + contextTask.recycle();
>> + taskPool.offer(contextTask);
>> + }
>> +
>> + public Object call() throws Exception {
>> + IOEvent ioEvent = context.getIOEvent();
>> + OpType currentOpType = context.getCurrentOpType();
>> +
>> + try {
>> + if (currentOpType == OpType.OP_READ) {
>> + callBackHandler.onRead(ioEvent);
>> + } else if (currentOpType == OpType.OP_WRITE) {
>> + callBackHandler.onWrite(ioEvent);
>> + } else if (currentOpType == OpType.OP_CONNECT) {
>> + callBackHandler.onConnect(ioEvent);
>> + }
>> + } finally {
>> + if (ioEvent != null) {
>> + // Prevent the CallbackHandler to re-use the context.
>> + // TODO: This is still dangerous as the Context
>> might have been
>> + // cached by the CallbackHandler.
>> + ioEvent.attach(null);
>> + ioEvent = null;
>> + }
>> +
>> + offer(this);
>> + }
>> +
>> + return null;
>> + }
>> +
>> + public CallbackHandler getCallBackHandler() {
>> + return callBackHandler;
>> + }
>> +
>> + public void setCallBackHandler(CallbackHandler callBackHandler) {
>> + this.callBackHandler = callBackHandler;
>> + }
>> +
>> + @Override
>> + public void recycle() {
>> + callBackHandler = null;
>> + super.recycle();
>> + }
>> +
>> +
>> +}
>> Index: Context.java
>> --- Context.java Base (BASE)
>> +++ Context.java Locally Modified (Based On LOCAL)
>> @@ -23,9 +23,14 @@
>>
>> package com.sun.grizzly;
>>
>> +import com.sun.grizzly.async.AsyncQueueWritable;
>> +import com.sun.grizzly.async.AsyncQueueWriter;
>> +import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>> +import java.io.IOException;
>> +import java.net.SocketAddress;
>> +import java.nio.ByteBuffer;
>> import java.nio.channels.SelectionKey;
>> import java.util.HashMap;
>> -import java.util.concurrent.Callable;
>>
>> /**
>> * This Object is used to share information between the Grizzly
>> Framework
>> @@ -33,7 +38,7 @@
>> *
>> * @author Jeanfrancois Arcand
>> */
>> -public class Context implements Callable {
>> +public class Context {
>>
>> /**
>> * A SelectionKey's registration state.
>> @@ -119,6 +124,16 @@
>>
>>
>> /**
>> + * <code>AsyncQueueWriter</code>
>> + */
>> + private AsyncQueueWriter asyncQueueWriter;
>> +
>> + /**
>> + * <code>AsyncQueueWritable</code>
>> + */
>> + private AsyncQueueWritable asyncQueueWritable;
>> +
>> + /**
>> * Constructor
>> */
>> public Context() {
>> @@ -210,6 +225,7 @@
>> currentOpType = null;
>> protocolChain = null;
>> ioEvent = null;
>> + asyncQueueWriter = null;
>> }
>>
>>
>> @@ -233,47 +249,6 @@
>>
>>
>> /**
>> - * Execute the <code>ProtocolChain</code>.
>> - * @throws java.lang.Exception Exception thrown by protocol chain
>> - */
>> - public Object call() throws Exception {
>> - // If a IOEvent has been defined, invoke it first and
>> - // let its associated CallbackHandler decide if the
>> ProtocolChain
>> - // be invoked or not.
>> - Object attachment = key.attachment();
>> - if (ioEvent != null && (attachment instanceof
>> CallbackHandler)){
>> - try{
>> - CallbackHandler callBackHandler =
>> ((CallbackHandler)attachment);
>> - if (currentOpType == OpType.OP_READ){
>> - callBackHandler.onRead(ioEvent);
>> - } else if (currentOpType == OpType.OP_WRITE){
>> - callBackHandler.onWrite(ioEvent);
>> - } else if (currentOpType == OpType.OP_CONNECT){
>> - callBackHandler.onConnect(ioEvent);
>> - }
>> - } finally {
>> - if (ioEvent != null){
>> - // Prevent the CallbackHandler to re-use the
>> context.
>> - // TODO: This is still dangerous as the Context
>> might have been
>> - // cached by the CallbackHandler.
>> - ioEvent.attach(null);
>> - ioEvent = null;
>> - }
>> - }
>> - } else {
>> - SelectionKey currentKey = key;
>> -
>> selectorHandler.getSelectionKeyHandler().process(currentKey);
>> - try {
>> - protocolChain.execute(this);
>> - } finally {
>> - selectorHandler.getSelectionKeyHandler().postProcess(currentKey);
>> - }
>> - }
>> - return null;
>> - }
>> -
>> -
>> - /**
>> * Return <code>ProtocolChain</code> executed by this instance.
>> * @return <code>ProtocolChain</code> instance
>> */
>> @@ -313,9 +288,12 @@
>> * Execute this Context using the Controller's Pipeline
>> * @throws com.sun.grizzly.PipelineFullException
>> */
>> - public void execute() throws PipelineFullException{
>>
>>
>> Here we must keep the execute() method available as it will breaks
>> existing application (like Sailfin).
>>
>> - getPipeline().execute(this);
>> + public void execute(ContextTask contextTask) throws
>> PipelineFullException {
>> + if (contextTask != null) {
>> + contextTask.setContext(this);
>> + getPipeline().execute(contextTask);
>> }
>>
>> else throw IllegalStateException("ContextTask cannot be null");
>>
>> + }
>>
>>
>> /**
>> @@ -392,4 +370,89 @@
>> public void setSelectorHandler(SelectorHandler selectorHandler) {
>> this.selectorHandler = selectorHandler;
>> }
>> +
>> +
>> + /**
>> + * Returns <code>AsyncQueueWritable</code>, assciated with the
>> current
>> + * <code>Context</code>. This method is not threadsafe.
>> + *
>> + * @return <code>AsyncQueueWritable</code>
>> + */
>> + public AsyncQueueWritable getAsyncQueueWritable() {
>> + if (asyncQueueWritable == null) {
>> + asyncQueueWritable = new
>> AsyncQueueWritableContextWrapper();
>> }
>> +
>> + return asyncQueueWritable;
>> + }
>> +
>> + /**
>> + * Return the <code>AsyncQueueWriter</code>
>> + * @return the <code>AsyncQueueWriter</code>
>> + */
>> + protected AsyncQueueWriter getAsyncQueueWriter() {
>> + return asyncQueueWriter;
>> + }
>> +
>> + /**
>> + * Set the <code>AsyncQueueWriter</code>
>> + * @param asyncQueueWriter <code>AsyncQueueWriter</code>
>> + */
>> + protected void setAsyncQueueWriter(AsyncQueueWriter
>> asyncQueueWriter) {
>> + this.asyncQueueWriter = asyncQueueWriter;
>> + }
>> +
>> + private class AsyncQueueWritableContextWrapper implements
>> AsyncQueueWritable {
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer) throws
>> IOException {
>> + return asyncQueueWriter.write(key, buffer);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return asyncQueueWriter.write(key, buffer,
>> callbackHandler);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler,
>> + boolean isCloneByteBuffer) throws IOException {
>> + return asyncQueueWriter.write(key, buffer, callbackHandler,
>> + isCloneByteBuffer);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer)
>> + throws IOException {
>> + return asyncQueueWriter.write(key, dstAddress, buffer);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return asyncQueueWriter.write(key, dstAddress, buffer,
>> + callbackHandler);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer)
>> + throws IOException {
>> + return asyncQueueWriter.write(key, dstAddress, buffer,
>> + callbackHandler, isCloneByteBuffer);
>> + }
>> + }
>> +}
>> Index: ContextTask.java
>> --- ContextTask.java Locally New
>> +++ ContextTask.java Locally New
>> @@ -0,0 +1,70 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly;
>> +
>> +import java.util.concurrent.Callable;
>> +import java.util.concurrent.ConcurrentLinkedQueue;
>> +
>> +/**
>> + * Task, which will be executed by <code>Context</code>, when
>> + * Context.execute(<code>ContextTask</code>) is called.
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public abstract class ContextTask implements Callable {
>> + protected Context context;
>> +
>> + public Context getContext() {
>> + return context;
>> + }
>> +
>> + public void setContext(Context context) {
>> + this.context = context;
>> + }
>> +
>> + public void recycle() {
>> + context = null;
>> + }
>> +
>> + protected abstract static class TaskPool<E extends ContextTask> {
>> + private ConcurrentLinkedQueue<E> pool =
>> + new ConcurrentLinkedQueue<E>();
>> +
>> + public abstract E newTask();
>> +
>> + public E poll() {
>> + E task = pool.poll();
>> + if (task == null) {
>> + task = newTask();
>> + }
>> +
>> + return task;
>> + }
>> +
>> + public void offer(E task) {
>> + task.recycle();
>> + pool.offer(task);
>> + }
>> + }
>> +}
>> Index: Controller.java
>> --- Controller.java Base (BASE)
>> +++ Controller.java Locally Modified (Based On LOCAL)
>> @@ -298,6 +298,7 @@
>> key = iterator.next();
>> iterator.remove();
>> boolean skipOpWrite = false;
>> + delegateToWorkerThread = false;
>> if (key.isValid()) {
>> if ((key.readyOps() & SelectionKey.OP_ACCEPT)
>> == SelectionKey.OP_ACCEPT){
>> @@ -356,7 +357,7 @@
>> if (logger.isLoggable(Level.FINE)) {
>> logger.log(Level.FINE, "OP_WRITE on
>> " + key);
>> }
>> - delegateToWorkerThread = selectorHandler.
>> + delegateToWorkerThread |= selectorHandler.
>> onWriteInterest(key,serverCtx);
>> }
>>
>> @@ -368,7 +369,9 @@
>> Context context =
>> pollContext(key,protocolChain);
>>
>> context.setSelectorHandler(selectorHandler);
>>
>> context.setPipeline(selectorHandler.pipeline());
>> - context.execute();
>> + context.setAsyncQueueWriter(
>> +
>> selectorHandler.getAsyncQueueWriter());
>> + context.execute(ProtocolChainContextTask.poll());
>>
>> This bring an interesting point. Instead of setting the
>> setAsyncQueueWriter on every request, should the Context instance be
>> pooled per Selector instead of the Controller? That way once
>> configured, a Context can be pooled and never have to be
>> re-configured again?
>>
>>
>>
>> }
>> } else {
>>
>> selectorHandler.getSelectionKeyHandler().cancel(key);
>> Index: filter/EchoAsyncWriteQueueFilter.java
>> --- filter/EchoAsyncWriteQueueFilter.java Locally New
>> +++ filter/EchoAsyncWriteQueueFilter.java Locally New
>> @@ -0,0 +1,81 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.filter;
>> +
>> +import com.sun.grizzly.Context;
>> +import com.sun.grizzly.Controller;
>> +import com.sun.grizzly.ProtocolFilter;
>> +import com.sun.grizzly.util.OutputWriter;
>> +import com.sun.grizzly.util.WorkerThread;
>> +import java.io.IOException;
>> +import java.net.SocketAddress;
>> +import java.nio.ByteBuffer;
>> +import java.nio.channels.DatagramChannel;
>> +import java.nio.channels.SelectableChannel;
>> +import java.util.concurrent.atomic.AtomicInteger;
>> +import java.util.logging.Level;
>> +
>> +/**
>> + * Simple echo filter
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class EchoAsyncWriteQueueFilter implements ProtocolFilter {
>> + AtomicInteger counter = new AtomicInteger(1);
>> + public boolean execute(Context ctx) throws IOException {
>> + final WorkerThread workerThread =
>> ((WorkerThread)Thread.currentThread());
>> + ByteBuffer buffer = workerThread.getByteBuffer();
>> + buffer.flip();
>> + if (buffer.hasRemaining()) {
>> + // Depending on protocol perform echo
>> + try {
>> + if (ctx.getProtocol() == Controller.Protocol.TCP) {
>> // TCP case
>> + ctx.getAsyncQueueWritable().writeToAsyncQueue(buffer, null, true);
>> + } else if (ctx.getProtocol() ==
>> Controller.Protocol.UDP) { // UDP case
>> + SocketAddress address = (SocketAddress)
>> +
>> ctx.getAttribute(ReadFilter.UDP_SOCKETADDRESS);
>> +
>> ctx.getAsyncQueueWritable().writeToAsyncQueue(address,
>> + buffer, null, true);
>> + }
>> + } catch (IOException ex) {
>> + // Store incoming data in byte[]
>> + byte[] data = new byte[buffer.remaining()];
>> + int position = buffer.position();
>> + buffer.get(data);
>> + buffer.position(position);
>> +
>> + Controller.logger().log(Level.WARNING,
>> + "Exception. Echo \"" + new String(data) +
>> "\"");
>> + throw ex;
>> + }
>> + }
>> +
>> + buffer.clear();
>> + return false;
>> + }
>> +
>> + public boolean postExecute(Context ctx) throws IOException {
>> + return true;
>> + }
>> +}
>> Index: ProtocolChainContextTask.java
>> --- ProtocolChainContextTask.java Locally New
>> +++ ProtocolChainContextTask.java Locally New
>> @@ -0,0 +1,69 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +
>> +package com.sun.grizzly;
>> +
>> +import java.nio.channels.SelectionKey;
>> +
>> +/**
>> + * <code>ProtocolChain</code> task, which will be executed by
>> + * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
>> + * is called.
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class ProtocolChainContextTask extends ContextTask {
>> + private static final TaskPool<ProtocolChainContextTask> taskPool =
>> + new TaskPool<ProtocolChainContextTask>() {
>> + @Override
>> + public ProtocolChainContextTask newTask() {
>> + return new ProtocolChainContextTask();
>> + }
>> + };
>> +
>> + public static ProtocolChainContextTask poll() {
>> + return taskPool.poll();
>> + }
>> +
>> + public static void offer(ProtocolChainContextTask contextTask) {
>> + contextTask.recycle();
>> + taskPool.offer(contextTask);
>> + }
>> +
>> + public Object call() throws Exception {
>> + SelectionKey currentKey = context.getSelectionKey();
>> + SelectionKeyHandler selectionKeyHandler = context.
>> + getSelectorHandler().getSelectionKeyHandler();
>> +
>> + selectionKeyHandler.process(currentKey);
>> + try {
>> + context.getProtocolChain().execute(context);
>> + } finally {
>> + selectionKeyHandler.postProcess(currentKey);
>> + offer(this);
>> + }
>> +
>> + return null;
>> + }
>> +}
>> Index: SelectorHandler.java
>> --- SelectorHandler.java Base (BASE)
>> +++ SelectorHandler.java Locally Modified (Based On LOCAL)
>> @@ -22,6 +22,7 @@
>> */
>> package com.sun.grizzly;
>>
>> +import com.sun.grizzly.async.AsyncQueueWriter;
>> import com.sun.grizzly.util.Copyable;
>> import java.io.IOException;
>> import java.nio.channels.SelectableChannel;
>> @@ -203,6 +204,16 @@
>>
>>
>> /**
>> + * Returns <code>AsyncQueueWriter</code> associated with this
>> + * <code>SelectorHandler</code>. Method will return null, if this
>> + * <code>TCPSelectorHandler</code> is not running.
>> + *
>> + * @return <code>AsyncQuquqWriter</code>
>> + */
>> + public AsyncQueueWriter getAsyncQueueWriter();
>> +
>> +
>> + /**
>> * Return the Pipeline used to execute this SelectorHandler's
>> * SelectionKey ops
>> * @return The pipeline to use, or null if the Controller's
>> Pipeline
>> Index: TCPConnectorHandler.java
>> --- TCPConnectorHandler.java Base (BASE)
>> +++ TCPConnectorHandler.java Locally Modified (Based On LOCAL)
>> @@ -23,6 +23,8 @@
>> package com.sun.grizzly;
>>
>> import com.sun.grizzly.Controller.Protocol;
>> +import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>> +import com.sun.grizzly.async.AsyncQueueWritable;
>> import com.sun.grizzly.util.ByteBufferInputStream;
>> import com.sun.grizzly.util.OutputWriter;
>> import java.io.IOException;
>> @@ -70,7 +72,8 @@
>> *
>> * @author Jeanfrancois Arcand
>> */
>> -public class TCPConnectorHandler implements
>> ConnectorHandler<TCPSelectorHandler, CallbackHandler>{
>> +public class TCPConnectorHandler implements
>> + ConnectorHandler<TCPSelectorHandler, CallbackHandler>,
>> AsyncQueueWritable {
>>
>> /**
>> * The underlying TCPSelectorHandler used to mange SelectionKeys.
>> @@ -399,6 +402,72 @@
>>
>>
>> /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer) throws
>> IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> + socketChannel.keyFor(selectorHandler.getSelector()),
>> buffer);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> + socketChannel.keyFor(selectorHandler.getSelector()),
>> buffer,
>> + callbackHandler);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler,
>> + boolean isCloneByteBuffer) throws IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> + socketChannel.keyFor(selectorHandler.getSelector()),
>> buffer,
>> + callbackHandler, isCloneByteBuffer);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer)
>> + throws IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> + socketChannel.keyFor(selectorHandler.getSelector()),
>> dstAddress,
>> + buffer);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> + socketChannel.keyFor(selectorHandler.getSelector()),
>> dstAddress,
>> + buffer, callbackHandler);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer)
>> + throws IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> + socketChannel.keyFor(selectorHandler.getSelector()),
>> dstAddress,
>> + buffer, callbackHandler, isCloneByteBuffer);
>> + }
>> +
>> +
>> + /**
>> * Close the underlying connection.
>> */
>> public void close() throws IOException{
>> Index: TCPSelectorHandler.java
>> --- TCPSelectorHandler.java Base (BASE)
>> +++ TCPSelectorHandler.java Locally Modified (Based On LOCAL)
>> @@ -23,6 +23,9 @@
>>
>> package com.sun.grizzly;
>>
>> +import com.sun.grizzly.async.AsyncQueueWriter;
>> +import com.sun.grizzly.async.TCPAsyncQueueWriter;
>> +import com.sun.grizzly.async.AsyncQueueWriterContextTask;
>> import com.sun.grizzly.util.Cloner;
>> import com.sun.grizzly.util.Copyable;
>> import com.sun.grizzly.util.SelectionKeyOP;
>> @@ -171,6 +174,7 @@
>> */
>> protected ProtocolChainInstanceHandler instanceHandler;
>>
>> + protected AsyncQueueWriter asyncQueueWriter;
>>
>> public TCPSelectorHandler(){
>> }
>> @@ -234,6 +238,10 @@
>> public void preSelect(Context ctx) throws IOException {
>> initOpRegistriesIfRequired();
>>
>> + if (asyncQueueWriter == null) {
>> + asyncQueueWriter = new TCPAsyncQueueWriter(this);
>> + }
>> +
>> if (selector == null){
>> try{
>> connectorInstanceHandler = new
>> ConnectorInstanceHandler.
>> @@ -423,7 +431,7 @@
>> /**
>> * Shuntdown this instance by closing its Selector and
>> associated channels.
>> */
>> - public void shutdown(){
>> + public void shutdown() {
>> if (selector != null){
>> for (SelectionKey selectionKey : selector.keys()) {
>> selectionKeyHandler.close(selectionKey);
>> @@ -454,7 +462,11 @@
>> "selector.close",ex);
>> }
>>
>> + if (asyncQueueWriter != null) {
>> + asyncQueueWriter.close();
>> + asyncQueueWriter = null;
>> }
>> + }
>>
>> /**
>> * {_at_inheritDoc}
>> @@ -494,10 +506,11 @@
>> throws IOException{
>> // disable OP_READ on key before doing anything else
>> key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
>> - if (key.attachment() instanceof CallbackHandler){
>> - final Context context =
>> ctx.getController().pollContext(key);
>> + Object attach = key.attachment();
>> + if (attach instanceof CallbackHandler){
>> + final Context context = pollContext(ctx, key);
>> context.setCurrentOpType(Context.OpType.OP_READ);
>> - invokeCallbackHandler(context);
>> + invokeCallbackHandler((CallbackHandler) attach, context);
>> return false;
>> } else {
>> return true;
>> @@ -515,12 +528,19 @@
>> throws IOException{
>> // disable OP_WRITE on key before doing anything else
>> key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
>> - if (key.attachment() instanceof CallbackHandler){
>> - final Context context =
>> ctx.getController().pollContext(key);
>> - context.setSelectionKey(key);
>> +
>> + Object attach = null;
>> +
>> + if (asyncQueueWriter.hasReadyAsyncWriteData(key)) {
>> + final Context context = pollContext(ctx, key);
>> context.setCurrentOpType(Context.OpType.OP_WRITE);
>> - invokeCallbackHandler(context);
>> + invokeAsyncQueueWriter(context);
>> return false;
>> + } else if ((attach = key.attachment()) instanceof
>> CallbackHandler){
>> + final Context context = pollContext(ctx, key);
>> + context.setCurrentOpType(Context.OpType.OP_WRITE);
>> + invokeCallbackHandler((CallbackHandler) attach, context);
>> + return false;
>> } else {
>> return true;
>> }
>> @@ -541,11 +561,11 @@
>> key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
>> key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
>>
>> - if (key.attachment() instanceof CallbackHandler){
>> - Context context = ctx.getController().pollContext(key);
>> - context.setSelectionKey(key);
>> + Object attach = key.attachment();
>> + if (attach instanceof CallbackHandler){
>> + final Context context = pollContext(ctx, key);
>> context.setCurrentOpType(Context.OpType.OP_CONNECT);
>> - invokeCallbackHandler(context);
>> + invokeCallbackHandler((CallbackHandler) attach, context);
>> }
>> return false;
>> }
>> @@ -556,13 +576,14 @@
>> * @param context <code>Context</code>
>> * @throws java.io.IOException
>> */
>> - protected void invokeCallbackHandler(Context context) throws
>> IOException{
>> - context.setSelectorHandler(this);
>> -
>> + protected void invokeCallbackHandler(CallbackHandler
>> callbackHandler,
>> + Context context) throws IOException{
>> IOEvent<Context>ioEvent = new
>> IOEvent.DefaultIOEvent<Context>(context);
>> context.setIOEvent(ioEvent);
>> try {
>> - context.execute();
>> + CallbackHandlerContextTask task =
>> CallbackHandlerContextTask.poll();
>> + task.setCallBackHandler(callbackHandler);
>> + context.execute(task);
>> } catch (PipelineFullException ex){
>> throw new IOException(ex.getMessage());
>> }
>> @@ -570,6 +591,22 @@
>>
>>
>> /**
>> + * Invoke a <code>AsyncQueueWriter</code>
>> + * @param context <code>Context</code>
>> + * @throws java.io.IOException
>> + */
>> + protected void invokeAsyncQueueWriter(Context context) throws
>> IOException {
>> + AsyncQueueWriterContextTask task =
>> AsyncQueueWriterContextTask.poll();
>> + task.setAsyncQueueWriter(asyncQueueWriter);
>> + try {
>> + context.execute(task);
>> + } catch (PipelineFullException ex) {
>> + throw new IOException(ex.getMessage());
>> + }
>> + }
>> +
>> +
>> + /**
>> * Return an instance of the default <code>ConnectorHandler</code>,
>> * which is the <code>TCPConnectorHandler</code>
>> * @return <code>ConnectorHandler</code>
>> @@ -654,6 +691,13 @@
>> this.selector = selector;
>> }
>>
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public AsyncQueueWriter getAsyncQueueWriter() {
>> + return asyncQueueWriter;
>> + }
>> +
>> public long getSelectTimeout() {
>> return selectTimeout;
>> }
>> @@ -855,8 +899,19 @@
>> } catch (IOException ex){
>> ; // LOG ME
>> }
>> +
>> + asyncQueueWriter.onClose(channel);
>> }
>>
>> + private Context pollContext(final Context serverContext,
>> + final SelectionKey key) {
>> + final Context context =
>> serverContext.getController().pollContext(key);
>> + context.setSelectionKey(key);
>> + context.setSelectorHandler(this);
>> + context.setAsyncQueueWriter(asyncQueueWriter);
>> + return context;
>> + }
>> +
>> //--------------- ConnectorInstanceHandler
>> -----------------------------
>> /**
>> * Return <Callable>factory<Callable> object, which knows how
>> Index: UDPConnectorHandler.java
>> --- UDPConnectorHandler.java Base (BASE)
>> +++ UDPConnectorHandler.java Locally Modified (Based On LOCAL)
>> @@ -23,6 +23,8 @@
>>
>> package com.sun.grizzly;
>>
>> +import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>> +import com.sun.grizzly.async.AsyncQueueWritable;
>> import com.sun.grizzly.util.ByteBufferInputStream;
>> import java.io.IOException;
>> import java.net.SocketAddress;
>> @@ -47,7 +49,8 @@
>> *
>> * @author Jeanfrancois Arcand
>> */
>> -public class UDPConnectorHandler implements
>> ConnectorHandler<UDPSelectorHandler, CallbackHandler>{
>> +public class UDPConnectorHandler implements
>> + ConnectorHandler<UDPSelectorHandler, CallbackHandler>,
>> AsyncQueueWritable {
>>
>> /**
>> * The underlying UDPSelectorHandler used to mange SelectionKeys.
>> @@ -339,6 +342,72 @@
>>
>>
>> /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer) throws
>> IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> +
>> datagramChannel.keyFor(selectorHandler.getSelector()), buffer);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> +
>> datagramChannel.keyFor(selectorHandler.getSelector()), buffer,
>> + callbackHandler);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler,
>> + boolean isCloneByteBuffer) throws IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> +
>> datagramChannel.keyFor(selectorHandler.getSelector()), buffer,
>> + callbackHandler, isCloneByteBuffer);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer)
>> + throws IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> +
>> datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress,
>> + buffer);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> +
>> datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress,
>> + buffer, callbackHandler);
>> + }
>> +
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
>> ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer)
>> + throws IOException {
>> + return selectorHandler.getAsyncQueueWriter().write(
>> +
>> datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress,
>> + buffer, callbackHandler, isCloneByteBuffer);
>> + }
>> +
>> +
>> + /**
>> * Receive bytes.
>> * @param byteBuffer The byteBuffer to store bytes.
>> * @param socketAddress
>> @@ -469,5 +538,4 @@
>> public UDPSelectorHandler getSelectorHandler() {
>> return selectorHandler;
>> }
>> -
>> }
>> Index: UDPSelectorHandler.java
>> --- UDPSelectorHandler.java Base (BASE)
>> +++ UDPSelectorHandler.java Locally Modified (Based On LOCAL)
>> @@ -23,6 +23,7 @@
>>
>> package com.sun.grizzly;
>>
>> +import com.sun.grizzly.async.UDPAsyncQueueWriter;
>> import com.sun.grizzly.util.Copyable;
>> import com.sun.grizzly.util.SelectionKeyOP;
>> import java.io.IOException;
>> @@ -92,6 +93,10 @@
>> public void preSelect(Context ctx) throws IOException {
>> initOpRegistriesIfRequired();
>>
>> + if (asyncQueueWriter == null) {
>> + asyncQueueWriter = new UDPAsyncQueueWriter(this);
>> + }
>> +
>> if (selector == null){
>> try{
>> connectorInstanceHandler = new
>> ConnectorInstanceHandler.
>> @@ -174,7 +179,12 @@
>> Controller.logger().log(Level.SEVERE,
>> "closeSocketException",ex);
>> }
>> +
>> + if (asyncQueueWriter != null) {
>> + asyncQueueWriter.close();
>> + asyncQueueWriter = null;
>> }
>> + }
>>
>>
>> /**
>> @@ -258,6 +268,8 @@
>> } catch (IOException ex){
>> ; // LOG ME
>> }
>> +
>> + asyncQueueWriter.onClose(channel);
>> }
>>
>> //--------------- ConnectorInstanceHandler
>> -----------------------------
>>
>>
>>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>>
>>>
>>> ------------------------------------------------------------------------
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>