dev@grizzly.java.net

Re: AsyncWriter proposal

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Tue, 06 Nov 2007 14:35:25 +0100

Hello,


>> In diff file you can see refactoring I'm proposing for Context class.
>> Currently context class has some logic, which is executed by
>> Pipeline(ThreadPool): callback handler, protocol chain execution. I
>> propose to move this logic to ContextTask interface implementations,
>> this will simplify Context class. Because currently we have 2
>> possibilities for Context: execute callback handler, protocol chain.
>> After adding AsyncWriteQueue - 3 possibilities... Think it's too much
>> to hold in Context?
>
> If the functionality is the same but better designed, I'm +1 :-)
I just think, that Context should represent some state, and not contain
execution logic inside. But I added old Context.execute() method support.
What do you think about making it deprecated?

>> I was thinking about async reading, looks like we can apply it for
>> UDP protocol. (Discussion with Radim on users_at_grizzly mailing list).
>> This way we can try to simulate some features, which TCP protocol
>> has, for the server side.
>
> Not only TCP/UDP, but also Charlie have presented (not sure
> implemented) last year an API where you can register a
> key/byteBuffer/size and get notified only when all the bytes requested
> are available (similar to what AIO/NIO.2 will do). I think we should
> keep that idea in mind when designing that interface.
Good idea. One more reason for AsyncRead.

>
> + /**
> + * Method writes <code>ByteBuffer</code> to the
> <code>SelectableChannel</code>
> + * First, if <code>SelectableChannel</code> associated write
> queue is empty -
> + * it tries to write <code>ByteBuffer</code> to the given
> + * <code>SelectableChannel</code> directly (without putting to
> the queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>
> + * and <code>SelectableChannel<code> will be registered on
> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
> + * If an exception occurs, during direct writing - it will be
> propogated
>
> Typo propagated
fixed.

>
> + * to the caller directly, otherwise, if the
> <code>ByteBuffer</code> is
> + * added to a writing queue - exception notification will come via
> + * <code>AsyncWriteCallbackHandler</code>
>
> add the AsyncWriteCallbackHandler.onIOException()
fixed.


> Index: async/AsyncQueueWriterContextTask.java
> --- async/AsyncQueueWriterContextTask.java Locally New
> +++ async/AsyncQueueWriterContextTask.java Locally New
> @@ -0,0 +1,78 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import com.sun.grizzly.*;
>
> Fix the '*' by listing the classes instead (Netbeans Netbeans Netbeans
> ;-))
Fixed :)

> +
> +/**
> + * <code>AsyncQueueWriter</code> task, which will be executed by
> + * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
> + * is called.
> + *
> + * @author Alexey Stashok
> + */
> +public class AsyncQueueWriterContextTask extends ContextTask {
> + private static final TaskPool<AsyncQueueWriterContextTask>
> taskPool =
> + new TaskPool<AsyncQueueWriterContextTask>() {
> + @Override
> + public AsyncQueueWriterContextTask newTask() {
> + return new AsyncQueueWriterContextTask();
> + }
> + };
> +
> + private AsyncQueueWriter asyncQueueWriter;
> +
> + public static AsyncQueueWriterContextTask poll() {
> + return taskPool.poll();
> + }
> +
> + public static void offer(AsyncQueueWriterContextTask contextTask) {
> + contextTask.recycle();
> + taskPool.offer(contextTask);
> + }
> +
> + public Object call() throws Exception {
> + try {
> + asyncQueueWriter.onWrite(context.getSelectionKey());
>
> Can you make sure the getSelectionKey() is not null, and return an
> IllegalStateException in case the Key is null? Just to make the code
> robust, event if the key cannot be null.
Fixed.

>
> Index: async/AsyncWriteCallbackHandler.java
> --- async/AsyncWriteCallbackHandler.java Locally New
> +++ async/AsyncWriteCallbackHandler.java Locally New
> @@ -0,0 +1,38 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import java.nio.ByteBuffer;
> +import java.nio.channels.SelectionKey;
> +
> +/**
> + * Callback handler interface, used by <code>AsyncQueueWriter</code>
> to notify
> + * custom code about completion of specific <code>ByteBuffer</code>
> writing.
> + *
> + * @author Alexey Stashok
> + */
> +public interface AsyncWriteCallbackHandler {
> + public void onWriteCompleted(SelectionKey key, ByteBuffer buffer);
> + public void onIOException(SelectionKey key, ByteBuffer buffer);
> +}
>
> The API looks good, but I would add more docs here as this interface
> will be implemented by the user of Grizzly, and since we are lacking
> in term of tutorial, better to have good docs :-)
Added description for each method. also changed onIOException to
public void onIOException(IOException ioException, SelectionKey key,
ByteBuffer buffer);

> Index: async/AsyncWriteQueue.java
> --- async/AsyncWriteQueue.java Locally New
> +++ async/AsyncWriteQueue.java Locally New
> @@ -0,0 +1,144 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import java.nio.channels.SelectableChannel;
> +import java.util.Map;
> +import java.util.concurrent.ConcurrentHashMap;
> +import java.util.concurrent.ConcurrentLinkedQueue;
> +import java.util.concurrent.locks.ReentrantLock;
> +
> +/**
> + * Class represents collection of
> + * <code>SelectableChannel</code>s and correspondent queue of
> + * <code>ByteBuffer</code>, which should be written asynchronously.
> + * This implementation is TCP protocol specific.
>
> To follow our convention, should this class be called TCPAsyncWriteQueue?
Sorry it's javadoc problem. This class is common for all protocols.


> + *
> + * @author Alexey Stashok
> + */
> +public class AsyncWriteQueue<E>{
> + private Map<SelectableChannel, ChannelAsyncWriteEntry> queueMap =
> + new ConcurrentHashMap<SelectableChannel,
> ChannelAsyncWriteEntry>();
> +
> + /**
> + * Add data to the <code>AsyncWriteQueue</code>, corresponding to
> the given
> + * <code>SelectableChannel</code>
> + *
> + * @param channel <code>SelectableChannel</code>
> + * @param queueRecord write data unit
> + */
> + public void offer(SelectableChannel channel, E queueRecord) {
> + ChannelAsyncWriteEntry entry =
> obtainChannelAsyncWriteEntry(channel);
> + entry.queue.offer(queueRecord);
> + }
> +
> + /**
> + * Get head element of <code>SelectableChannel</code> write queue.
> + * Element will not be removed from queue.
> + *
> + * @param channel <code>SelectableChannel</code>
> + *
> + * @return <code>AsyncQueueRecord</code> write data unit
> + */
> + public E peek(SelectableChannel channel) {
> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
> + if (entry != null) {
> + return entry.queue.peek();
> + }
> +
> + return null;
> + }
> +
> + /**
> + * Get head element of <code>SelectableChannel</code> write queue.
> + * Element will be removed from queue.
> + *
> + * @param channel <code>SelectableChannel</code>
> + *
> + * @return <code>AsyncQueueRecord</code> write data unit
> + */
> + public E poll(SelectableChannel channel) {
> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
> + if (entry != null) {
> + return entry.queue.poll();
> + }
> +
> + return null;
> + }
> +
> + /**
> + * Remove head element of <code>SelectableChannel</code> write
> queue.
> + *
> + * @param channel <code>SelectableChannel</code>
> + */
> + public void removeEntry(SelectableChannel channel) {
> + queueMap.remove(channel);
> + }
> +
> + /**
> + * Get the size of <code>SelectableChannel</code> write queue.
> + *
> + * @param channel <code>SelectableChannel</code>
> + * @return size of <code>SelectableChannel</code> write queue.
> + */
> + public int size(SelectableChannel channel) {
> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
> + return entry == null ? 0 : entry.queue.size();
> + }
> +
> + public void clear() {
> + queueMap.clear();
> + }
> +
> + protected ChannelAsyncWriteEntry
> obtainChannelAsyncWriteEntry(SelectableChannel channel) {
> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
> + if (entry == null) {
> + synchronized(channel) {
>
> Minor: A ReentrantReadWriteLock might perform better than a Sync here
Hmm, not sure it will perform better. Double-check lock is bad pattern
(antipattern), but not sure there is anything faster?


> Index: async/TCPAsyncQueueWriter.java
> --- async/TCPAsyncQueueWriter.java Locally New
> +++ async/TCPAsyncQueueWriter.java Locally New
> @@ -0,0 +1,255 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import com.sun.grizzly.*;
> +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
> +import java.io.IOException;
> +import java.net.SocketAddress;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.SelectableChannel;
> +import java.nio.channels.SelectionKey;
> +import java.nio.channels.WritableByteChannel;
> +import java.util.concurrent.ConcurrentLinkedQueue;
> +
> +/**
> + * TCP implementation of <code>AsyncQueueWriter</code>
> + *
> + * @author Alexey Stashok
> + */
> +public class TCPAsyncQueueWriter implements AsyncQueueWriter {
> + private SelectorHandler selectorHandler;
> + private AsyncWriteQueue<TCPAsyncQueueRecord> writeQueue;
> +
> + public TCPAsyncQueueWriter(SelectorHandler selectorHandler) {
> + this.selectorHandler = selectorHandler;
> + writeQueue = new AsyncWriteQueue<TCPAsyncQueueRecord>();
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
> IOException {
> + return write(key, buffer, null);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return write(key, buffer, callbackHandler, false);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer) throws IOException {
> +
> + SelectableChannel channel = key.channel();
> + // If AsyncWriteQueue is empty - try to write ByteBuffer here
> + ChannelAsyncWriteEntry channelEntry =
> + writeQueue.obtainChannelAsyncWriteEntry(channel);
> +
> + ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
> channelEntry.queue;
> +
> + boolean hasWriteLock = false;
> +
> + try {
> + if (queue.isEmpty() &&
> + (hasWriteLock = channelEntry.writeLock.tryLock())) {
> + ((WritableByteChannel) channel).write(buffer);
> + }
> +
> + if (buffer.hasRemaining()) {
> + // clone ByteBuffer if required
> + if (isCloneByteBuffer) {
> + int size = buffer.remaining();
> + ByteBuffer newBuffer = buffer.isDirect() ?
> + ByteBuffer.allocateDirect(size) :
> + ByteBuffer.allocate(size);
>
> Here we need to support VIEW and HEAP_ARRAY like the remaining of the
> framework allow. We don't need it for the first release, but might be
> good to file an RFE. I would think an application that use
> ByteBuffer.wrap() would like to use the same type of bb when writing.
Fixed for ByteBufferFactory.allocateView(...)

> +
> + newBuffer.put(buffer);
> + newBuffer.position(0);
> + buffer = newBuffer;
> + }
> +
> + // add new element to the queue
> + channelEntry.updateLock.lock();
> + queue.offer(new TCPAsyncQueueRecord(buffer,
> + callbackHandler));
>
> Here should we try to cache TCPAsyncQueueRecord instead of calling
> 'new' for every write? I would think yes :-)
Fixed :)

>
> +
> + /*
> + * This check helps to avoid not required key
> registering
> + */
> + if (!channelEntry.onWriteLock.isLocked()) {
> + registerForWriting(key);
> + }
> +
> + channelEntry.updateLock.unlock();
> +
> + return false;
> + }
> + } finally {
> + if (hasWriteLock) {
> + channelEntry.writeLock.unlock();
> + }
> + }
> +
> + return true;
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean hasReadyAsyncWriteData(SelectionKey key) {
> + return writeQueue.size(key.channel()) > 0;
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void onWrite(SelectionKey key) throws IOException {
> + SelectableChannel channel = key.channel();
> +
> + ChannelAsyncWriteEntry channelEntry =
> + writeQueue.obtainChannelAsyncWriteEntry(channel);
> +
> + boolean hasUpdateLock = false;
> + boolean hasOnWriteLock = false;
> +
> + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
> + try {
> + ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
> channelEntry.queue;
> +
> + while (queue.size() > 0) {
> + if (hasUpdateLock) {
> + channelEntry.updateLock.unlock();
> + hasUpdateLock = false;
> + }
> +
> + TCPAsyncQueueRecord queueRecord = queue.peek();
> +
> + ByteBuffer byteBuffer = queueRecord.byteBuffer;
> + try {
> + ((WritableByteChannel)
> channel).write(byteBuffer);
> + } catch (IOException e) {
> + if (queueRecord.callbackHandler != null) {
> +
> queueRecord.callbackHandler.onIOException(key,
> + byteBuffer);
> + }
>
> else log the exception.
Fixed.


> + }
> +
> + if (!byteBuffer.hasRemaining()) {
> + TCPAsyncQueueRecord removedQueueRecord =
> + queue.poll();
> + assert removedQueueRecord == queueRecord;
> +
> + if (removedQueueRecord.callbackHandler !=
> null) {
> + removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
> + }
> + } else {
> + hasOnWriteLock = false;
> + channelEntry.onWriteLock.unlock();
> + registerForWriting(key);
> + break;
> + }
> +
> + if (queue.size() == 0) {
> + /*
> + * If queue is empty - there is possibility
> that write method
> + * will add entry, but it will not be
> processed, because
> + * onWriteLock is currently locked
> + */
> + channelEntry.updateLock.lock();
> + hasUpdateLock = true;
> + }
> + }
> + } finally {
> + if (hasOnWriteLock) {
> + channelEntry.onWriteLock.unlock();
> + }
> +
> + if (hasUpdateLock) {
> + channelEntry.updateLock.unlock();
> + }
> + }
> + }
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> ByteBuffer buffer) throws IOException {
> + throw new UnsupportedOperationException("Not supported for
> TCP transport.");
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + throw new UnsupportedOperationException("Not supported for
> TCP transport.");
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer) throws IOException {
> + throw new UnsupportedOperationException("Not supported for
> TCP transport.");
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void onClose(SelectableChannel channel) {
> + writeQueue.removeEntry(channel);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void close() {
> + writeQueue.clear();
> + writeQueue = null;
> + }
> +
> + private void registerForWriting(SelectionKey key) {
> + selectorHandler.register(key, SelectionKey.OP_WRITE);
> + }
> +
> + /**
> + * <code>AsyncWriteQueue</code> data unit specific for TCP protocol
> + */
> + protected static class TCPAsyncQueueRecord {
> + public ByteBuffer byteBuffer;
> + public AsyncWriteCallbackHandler callbackHandler;
> +
> + public TCPAsyncQueueRecord(ByteBuffer byteBuffer,
> + AsyncWriteCallbackHandler callbackHandler) {
> + this.byteBuffer = byteBuffer;
> + this.callbackHandler = callbackHandler;
> + }
> + }
> +}
> Index: async/UDPAsyncQueueWriter.java
> --- async/UDPAsyncQueueWriter.java Locally New
> +++ async/UDPAsyncQueueWriter.java Locally New
> @@ -0,0 +1,271 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import com.sun.grizzly.*;
> +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
> +import java.io.IOException;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.SelectableChannel;
> +import java.nio.channels.SelectionKey;
> +import java.nio.channels.WritableByteChannel;
> +import java.util.concurrent.ConcurrentLinkedQueue;
> +import java.net.SocketAddress;
> +import java.nio.channels.DatagramChannel;
> +
> +/**
> + * UDP implementation of <code>AsyncQueueWriter</code>
> + *
> + * @author Alexey Stashok
> + */
> +public class UDPAsyncQueueWriter implements AsyncQueueWriter {
> + private SelectorHandler selectorHandler;
> + private AsyncWriteQueue<UDPAsyncQueueRecord> writeQueue;
> +
> + public UDPAsyncQueueWriter(SelectorHandler selectorHandler) {
> + this.selectorHandler = selectorHandler;
> + writeQueue = new AsyncWriteQueue<UDPAsyncQueueRecord>();
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
> IOException {
> + return write(key, null, buffer, null);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return write(key, null, buffer, callbackHandler, false);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer) throws IOException {
> + return write(key, null, buffer, callbackHandler,
> isCloneByteBuffer);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> + ByteBuffer buffer) throws IOException {
> + return write(key, dstAddress, buffer, null);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> + ByteBuffer buffer, AsyncWriteCallbackHandler
> callbackHandler)
> + throws IOException {
> + return write(key, dstAddress, buffer, callbackHandler, false);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> + ByteBuffer buffer, AsyncWriteCallbackHandler
> callbackHandler,
> + boolean isCloneByteBuffer) throws IOException {
> +
> + SelectableChannel channel = key.channel();
> + // If AsyncWriteQueue is empty - try to write ByteBuffer here
> + ChannelAsyncWriteEntry channelEntry =
> + writeQueue.obtainChannelAsyncWriteEntry(channel);
> +
> + ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
> channelEntry.queue;
> +
> + boolean hasWriteLock = false;
> +
> + try {
> + if (queue.isEmpty() &&
> + (hasWriteLock = channelEntry.writeLock.tryLock())) {
> + doWrite((WritableByteChannel) channel, dstAddress,
> buffer);
> + }
> +
> + if (buffer.hasRemaining()) {
> + // clone ByteBuffer if required
> + if (isCloneByteBuffer) {
> + int size = buffer.remaining();
> + ByteBuffer newBuffer = buffer.isDirect() ?
> + ByteBuffer.allocateDirect(size) :
> + ByteBuffer.allocate(size);
> +
> + newBuffer.put(buffer);
> + newBuffer.position(0);
> + buffer = newBuffer;
> + }
>
> Same as above. Support HEAP and VIEW_HEAP/DIRECT via ByteBufferFactory
Fixed the same way as for TCP.

> +
> + // add new element to the queue
> + channelEntry.updateLock.lock();
> + queue.offer(new UDPAsyncQueueRecord(dstAddress, buffer,
> + callbackHandler));
> +
> + /*
> + * This check helps to avoid not required key
> registering
> + */
> + if (!channelEntry.onWriteLock.isLocked()) {
> + registerForWriting(key);
> + }
> +
> + channelEntry.updateLock.unlock();
> +
> + return false;
> + }
> + } finally {
> + if (hasWriteLock) {
> + channelEntry.writeLock.unlock();
> + }
> + }
> +
> + return true;
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean hasReadyAsyncWriteData(SelectionKey key) {
> + return writeQueue.size(key.channel()) > 0;
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void onWrite(SelectionKey key) throws IOException {
> + SelectableChannel channel = key.channel();
> +
> + ChannelAsyncWriteEntry channelEntry =
> + writeQueue.obtainChannelAsyncWriteEntry(channel);
> +
> + boolean hasUpdateLock = false;
> + boolean hasOnWriteLock = false;
> +
> + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
> + try {
> + ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
> + channelEntry.queue;
> +
> + while (queue.size() > 0) {
> + if (hasUpdateLock) {
> + channelEntry.updateLock.unlock();
> + hasUpdateLock = false;
> + }
> +
> + UDPAsyncQueueRecord queueRecord = queue.peek();
> +
> + ByteBuffer byteBuffer = queueRecord.byteBuffer;
> + try {
> + doWrite((WritableByteChannel) channel,
> + queueRecord.dstAddress, byteBuffer);
> + } catch (IOException e) {
> + if (queueRecord.callbackHandler != null) {
> +
> queueRecord.callbackHandler.onIOException(key,
> + byteBuffer);
> + }
> + }
>
> else log the exception.
Fixed.

> Index: Context.java
> --- Context.java Base (BASE)
> +++ Context.java Locally Modified (Based On LOCAL)
> @@ -23,9 +23,14 @@
>
> package com.sun.grizzly;
>
> +import com.sun.grizzly.async.AsyncQueueWritable;
> +import com.sun.grizzly.async.AsyncQueueWriter;
> +import com.sun.grizzly.async.AsyncWriteCallbackHandler;
> +import java.io.IOException;
> +import java.net.SocketAddress;
> +import java.nio.ByteBuffer;
> import java.nio.channels.SelectionKey;
> import java.util.HashMap;
> -import java.util.concurrent.Callable;
>
> /**
> * This Object is used to share information between the Grizzly
> Framework
> @@ -33,7 +38,7 @@
> *
> * @author Jeanfrancois Arcand
> */
> -public class Context implements Callable {
> +public class Context {
>
> /**
> * A SelectionKey's registration state.
> @@ -119,6 +124,16 @@
>
>
> /**
> + * <code>AsyncQueueWriter</code>
> + */
> + private AsyncQueueWriter asyncQueueWriter;
> +
> + /**
> + * <code>AsyncQueueWritable</code>
> + */
> + private AsyncQueueWritable asyncQueueWritable;
> +
> + /**
> * Constructor
> */
> public Context() {
> @@ -210,6 +225,7 @@
> currentOpType = null;
> protocolChain = null;
> ioEvent = null;
> + asyncQueueWriter = null;
> }
>
>
> @@ -233,47 +249,6 @@
>
>
> /**
> - * Execute the <code>ProtocolChain</code>.
> - * @throws java.lang.Exception Exception thrown by protocol chain
> - */
> - public Object call() throws Exception {
> - // If a IOEvent has been defined, invoke it first and
> - // let its associated CallbackHandler decide if the
> ProtocolChain
> - // be invoked or not.
> - Object attachment = key.attachment();
> - if (ioEvent != null && (attachment instanceof CallbackHandler)){
> - try{
> - CallbackHandler callBackHandler =
> ((CallbackHandler)attachment);
> - if (currentOpType == OpType.OP_READ){
> - callBackHandler.onRead(ioEvent);
> - } else if (currentOpType == OpType.OP_WRITE){
> - callBackHandler.onWrite(ioEvent);
> - } else if (currentOpType == OpType.OP_CONNECT){
> - callBackHandler.onConnect(ioEvent);
> - }
> - } finally {
> - if (ioEvent != null){
> - // Prevent the CallbackHandler to re-use the
> context.
> - // TODO: This is still dangerous as the Context
> might have been
> - // cached by the CallbackHandler.
> - ioEvent.attach(null);
> - ioEvent = null;
> - }
> - }
> - } else {
> - SelectionKey currentKey = key;
> -
> selectorHandler.getSelectionKeyHandler().process(currentKey);
> - try {
> - protocolChain.execute(this);
> - } finally {
> - selectorHandler.getSelectionKeyHandler().postProcess(currentKey);
> - }
> - }
> - return null;
> - }
> -
> -
> - /**
> * Return <code>ProtocolChain</code> executed by this instance.
> * @return <code>ProtocolChain</code> instance
> */
> @@ -313,9 +288,12 @@
> * Execute this Context using the Controller's Pipeline
> * @throws com.sun.grizzly.PipelineFullException
> */
> - public void execute() throws PipelineFullException{
>
>
> Here we must keep the execute() method available as it will breaks
> existing application (like Sailfin).
Fixed.

> Index: Controller.java
> --- Controller.java Base (BASE)
> +++ Controller.java Locally Modified (Based On LOCAL)
> @@ -298,6 +298,7 @@
> key = iterator.next();
> iterator.remove();
> boolean skipOpWrite = false;
> + delegateToWorkerThread = false;
> if (key.isValid()) {
> if ((key.readyOps() & SelectionKey.OP_ACCEPT)
> == SelectionKey.OP_ACCEPT){
> @@ -356,7 +357,7 @@
> if (logger.isLoggable(Level.FINE)) {
> logger.log(Level.FINE, "OP_WRITE on "
> + key);
> }
> - delegateToWorkerThread = selectorHandler.
> + delegateToWorkerThread |= selectorHandler.
> onWriteInterest(key,serverCtx);
> }
>
> @@ -368,7 +369,9 @@
> Context context =
> pollContext(key,protocolChain);
> context.setSelectorHandler(selectorHandler);
>
> context.setPipeline(selectorHandler.pipeline());
> - context.execute();
> + context.setAsyncQueueWriter(
> +
> selectorHandler.getAsyncQueueWriter());
> + context.execute(ProtocolChainContextTask.poll());
>
> This bring an interesting point. Instead of setting the
> setAsyncQueueWriter on every request, should the Context instance be
> pooled per Selector instead of the Controller? That way once
> configured, a Context can be pooled and never have to be re-configured
> again?
I thought about this. This will require adding more methods to the
SelectorHandler interface like pollContext, release/offerContext. I
prefer to not add much methods to the interface, but seems it could be
the right way.


Thank you for feedback!

WBR,
Alexey.