dev@grizzly.java.net

Re: AsyncWriter proposal

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Tue, 06 Nov 2007 11:43:40 -0500

Salut,

Oleksiy Stashok wrote:
> Hello,
>
>
>>> In diff file you can see refactoring I'm proposing for Context class.
>>> Currently context class has some logic, which is executed by
>>> Pipeline(ThreadPool): callback handler, protocol chain execution. I
>>> propose to move this logic to ContextTask interface implementations,
>>> this will simplify Context class. Because currently we have 2
>>> possibilities for Context: execute callback handler, protocol chain.
>>> After adding AsyncWriteQueue - 3 possibilities... Think it's too much
>>> to hold in Context?
>>
>> If the functionality is the same but better designed, I'm +1 :-)
> I just think, that Context should represent some state, and not contain
> execution logic inside. But I added old Context.execute() method support.
> What do you think about making it deprecated?

+1

>
>>> I was thinking about async reading, looks like we can apply it for
>>> UDP protocol. (Discussion with Radim on users_at_grizzly mailing list).
>>> This way we can try to simulate some features, which TCP protocol
>>> has, for the server side.
>>
>> Not only TCP/UDP, but also Charlie have presented (not sure
>> implemented) last year an API where you can register a
>> key/byteBuffer/size and get notified only when all the bytes requested
>> are available (similar to what AIO/NIO.2 will do). I think we should
>> keep that idea in mind when designing that interface.
> Good idea. One more reason for AsyncRead.
>
>>
>> + /**
>> + * Method writes <code>ByteBuffer</code> to the
>> <code>SelectableChannel</code>
>> + * First, if <code>SelectableChannel</code> associated write
>> queue is empty -
>> + * it tries to write <code>ByteBuffer</code> to the given
>> + * <code>SelectableChannel</code> directly (without putting to
>> the queue).
>> + * If associated write queue is not empty or after direct writing
>> + * <code>ByteBuffer</code> still has ready data to be written -
>> + * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code>
>> + * and <code>SelectableChannel<code> will be registered on
>> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
>> + * If an exception occurs, during direct writing - it will be
>> propogated
>>
>> Typo propagated
> fixed.
>
>>
>> + * to the caller directly, otherwise, if the
>> <code>ByteBuffer</code> is
>> + * added to a writing queue - exception notification will come via
>> + * <code>AsyncWriteCallbackHandler</code>
>>
>> add the AsyncWriteCallbackHandler.onIOException()
> fixed.
>
>
>> Index: async/AsyncQueueWriterContextTask.java
>> --- async/AsyncQueueWriterContextTask.java Locally New
>> +++ async/AsyncQueueWriterContextTask.java Locally New
>> @@ -0,0 +1,78 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import com.sun.grizzly.*;
>>
>> Fix the '*' by listing the classes instead (Netbeans Netbeans Netbeans
>> ;-))
> Fixed :)
>
>> +
>> +/**
>> + * <code>AsyncQueueWriter</code> task, which will be executed by
>> + * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
>> + * is called.
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class AsyncQueueWriterContextTask extends ContextTask {
>> + private static final TaskPool<AsyncQueueWriterContextTask>
>> taskPool =
>> + new TaskPool<AsyncQueueWriterContextTask>() {
>> + @Override
>> + public AsyncQueueWriterContextTask newTask() {
>> + return new AsyncQueueWriterContextTask();
>> + }
>> + };
>> +
>> + private AsyncQueueWriter asyncQueueWriter;
>> +
>> + public static AsyncQueueWriterContextTask poll() {
>> + return taskPool.poll();
>> + }
>> +
>> + public static void offer(AsyncQueueWriterContextTask contextTask) {
>> + contextTask.recycle();
>> + taskPool.offer(contextTask);
>> + }
>> +
>> + public Object call() throws Exception {
>> + try {
>> + asyncQueueWriter.onWrite(context.getSelectionKey());
>>
>> Can you make sure the getSelectionKey() is not null, and return an
>> IllegalStateException in case the Key is null? Just to make the code
>> robust, event if the key cannot be null.
> Fixed.
>
>>
>> Index: async/AsyncWriteCallbackHandler.java
>> --- async/AsyncWriteCallbackHandler.java Locally New
>> +++ async/AsyncWriteCallbackHandler.java Locally New
>> @@ -0,0 +1,38 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import java.nio.ByteBuffer;
>> +import java.nio.channels.SelectionKey;
>> +
>> +/**
>> + * Callback handler interface, used by <code>AsyncQueueWriter</code>
>> to notify
>> + * custom code about completion of specific <code>ByteBuffer</code>
>> writing.
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public interface AsyncWriteCallbackHandler {
>> + public void onWriteCompleted(SelectionKey key, ByteBuffer buffer);
>> + public void onIOException(SelectionKey key, ByteBuffer buffer);
>> +}
>>
>> The API looks good, but I would add more docs here as this interface
>> will be implemented by the user of Grizzly, and since we are lacking
>> in term of tutorial, better to have good docs :-)
> Added description for each method. also changed onIOException to
> public void onIOException(IOException ioException, SelectionKey key,
> ByteBuffer buffer);
>
>> Index: async/AsyncWriteQueue.java
>> --- async/AsyncWriteQueue.java Locally New
>> +++ async/AsyncWriteQueue.java Locally New
>> @@ -0,0 +1,144 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import java.nio.channels.SelectableChannel;
>> +import java.util.Map;
>> +import java.util.concurrent.ConcurrentHashMap;
>> +import java.util.concurrent.ConcurrentLinkedQueue;
>> +import java.util.concurrent.locks.ReentrantLock;
>> +
>> +/**
>> + * Class represents collection of
>> + * <code>SelectableChannel</code>s and correspondent queue of
>> + * <code>ByteBuffer</code>, which should be written asynchronously.
>> + * This implementation is TCP protocol specific.
>>
>> To follow our convention, should this class be called TCPAsyncWriteQueue?
> Sorry it's javadoc problem. This class is common for all protocols.
>
>
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class AsyncWriteQueue<E>{
>> + private Map<SelectableChannel, ChannelAsyncWriteEntry> queueMap =
>> + new ConcurrentHashMap<SelectableChannel,
>> ChannelAsyncWriteEntry>();
>> +
>> + /**
>> + * Add data to the <code>AsyncWriteQueue</code>, corresponding to
>> the given
>> + * <code>SelectableChannel</code>
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + * @param queueRecord write data unit
>> + */
>> + public void offer(SelectableChannel channel, E queueRecord) {
>> + ChannelAsyncWriteEntry entry =
>> obtainChannelAsyncWriteEntry(channel);
>> + entry.queue.offer(queueRecord);
>> + }
>> +
>> + /**
>> + * Get head element of <code>SelectableChannel</code> write queue.
>> + * Element will not be removed from queue.
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + *
>> + * @return <code>AsyncQueueRecord</code> write data unit
>> + */
>> + public E peek(SelectableChannel channel) {
>> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
>> + if (entry != null) {
>> + return entry.queue.peek();
>> + }
>> +
>> + return null;
>> + }
>> +
>> + /**
>> + * Get head element of <code>SelectableChannel</code> write queue.
>> + * Element will be removed from queue.
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + *
>> + * @return <code>AsyncQueueRecord</code> write data unit
>> + */
>> + public E poll(SelectableChannel channel) {
>> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
>> + if (entry != null) {
>> + return entry.queue.poll();
>> + }
>> +
>> + return null;
>> + }
>> +
>> + /**
>> + * Remove head element of <code>SelectableChannel</code> write
>> queue.
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + */
>> + public void removeEntry(SelectableChannel channel) {
>> + queueMap.remove(channel);
>> + }
>> +
>> + /**
>> + * Get the size of <code>SelectableChannel</code> write queue.
>> + *
>> + * @param channel <code>SelectableChannel</code>
>> + * @return size of <code>SelectableChannel</code> write queue.
>> + */
>> + public int size(SelectableChannel channel) {
>> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
>> + return entry == null ? 0 : entry.queue.size();
>> + }
>> +
>> + public void clear() {
>> + queueMap.clear();
>> + }
>> +
>> + protected ChannelAsyncWriteEntry
>> obtainChannelAsyncWriteEntry(SelectableChannel channel) {
>> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
>> + if (entry == null) {
>> + synchronized(channel) {
>>
>> Minor: A ReentrantReadWriteLock might perform better than a Sync here
> Hmm, not sure it will perform better. Double-check lock is bad pattern
> (antipattern), but not sure there is anything faster?


Not sure as well :-) Keep the sync then!

>
>
>> Index: async/TCPAsyncQueueWriter.java
>> --- async/TCPAsyncQueueWriter.java Locally New
>> +++ async/TCPAsyncQueueWriter.java Locally New
>> @@ -0,0 +1,255 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import com.sun.grizzly.*;
>> +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
>> +import java.io.IOException;
>> +import java.net.SocketAddress;
>> +import java.nio.ByteBuffer;
>> +import java.nio.channels.SelectableChannel;
>> +import java.nio.channels.SelectionKey;
>> +import java.nio.channels.WritableByteChannel;
>> +import java.util.concurrent.ConcurrentLinkedQueue;
>> +
>> +/**
>> + * TCP implementation of <code>AsyncQueueWriter</code>
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class TCPAsyncQueueWriter implements AsyncQueueWriter {
>> + private SelectorHandler selectorHandler;
>> + private AsyncWriteQueue<TCPAsyncQueueRecord> writeQueue;
>> +
>> + public TCPAsyncQueueWriter(SelectorHandler selectorHandler) {
>> + this.selectorHandler = selectorHandler;
>> + writeQueue = new AsyncWriteQueue<TCPAsyncQueueRecord>();
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
>> IOException {
>> + return write(key, buffer, null);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return write(key, buffer, callbackHandler, false);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer) throws IOException {
>> +
>> + SelectableChannel channel = key.channel();
>> + // If AsyncWriteQueue is empty - try to write ByteBuffer here
>> + ChannelAsyncWriteEntry channelEntry =
>> + writeQueue.obtainChannelAsyncWriteEntry(channel);
>> +
>> + ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
>> channelEntry.queue;
>> +
>> + boolean hasWriteLock = false;
>> +
>> + try {
>> + if (queue.isEmpty() &&
>> + (hasWriteLock = channelEntry.writeLock.tryLock())) {
>> + ((WritableByteChannel) channel).write(buffer);
>> + }
>> +
>> + if (buffer.hasRemaining()) {
>> + // clone ByteBuffer if required
>> + if (isCloneByteBuffer) {
>> + int size = buffer.remaining();
>> + ByteBuffer newBuffer = buffer.isDirect() ?
>> + ByteBuffer.allocateDirect(size) :
>> + ByteBuffer.allocate(size);
>>
>> Here we need to support VIEW and HEAP_ARRAY like the remaining of the
>> framework allow. We don't need it for the first release, but might be
>> good to file an RFE. I would think an application that use
>> ByteBuffer.wrap() would like to use the same type of bb when writing.
> Fixed for ByteBufferFactory.allocateView(...)
>
>> +
>> + newBuffer.put(buffer);
>> + newBuffer.position(0);
>> + buffer = newBuffer;
>> + }
>> +
>> + // add new element to the queue
>> + channelEntry.updateLock.lock();
>> + queue.offer(new TCPAsyncQueueRecord(buffer,
>> + callbackHandler));
>>
>> Here should we try to cache TCPAsyncQueueRecord instead of calling
>> 'new' for every write? I would think yes :-)
> Fixed :)
>
>>
>> +
>> + /*
>> + * This check helps to avoid not required key
>> registering
>> + */
>> + if (!channelEntry.onWriteLock.isLocked()) {
>> + registerForWriting(key);
>> + }
>> +
>> + channelEntry.updateLock.unlock();
>> +
>> + return false;
>> + }
>> + } finally {
>> + if (hasWriteLock) {
>> + channelEntry.writeLock.unlock();
>> + }
>> + }
>> +
>> + return true;
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean hasReadyAsyncWriteData(SelectionKey key) {
>> + return writeQueue.size(key.channel()) > 0;
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void onWrite(SelectionKey key) throws IOException {
>> + SelectableChannel channel = key.channel();
>> +
>> + ChannelAsyncWriteEntry channelEntry =
>> + writeQueue.obtainChannelAsyncWriteEntry(channel);
>> +
>> + boolean hasUpdateLock = false;
>> + boolean hasOnWriteLock = false;
>> +
>> + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
>> + try {
>> + ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
>> channelEntry.queue;
>> +
>> + while (queue.size() > 0) {
>> + if (hasUpdateLock) {
>> + channelEntry.updateLock.unlock();
>> + hasUpdateLock = false;
>> + }
>> +
>> + TCPAsyncQueueRecord queueRecord = queue.peek();
>> +
>> + ByteBuffer byteBuffer = queueRecord.byteBuffer;
>> + try {
>> + ((WritableByteChannel)
>> channel).write(byteBuffer);
>> + } catch (IOException e) {
>> + if (queueRecord.callbackHandler != null) {
>> +
>> queueRecord.callbackHandler.onIOException(key,
>> + byteBuffer);
>> + }
>>
>> else log the exception.
> Fixed.
>
>
>> + }
>> +
>> + if (!byteBuffer.hasRemaining()) {
>> + TCPAsyncQueueRecord removedQueueRecord =
>> + queue.poll();
>> + assert removedQueueRecord == queueRecord;
>> +
>> + if (removedQueueRecord.callbackHandler !=
>> null) {
>> + removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
>> + }
>> + } else {
>> + hasOnWriteLock = false;
>> + channelEntry.onWriteLock.unlock();
>> + registerForWriting(key);
>> + break;
>> + }
>> +
>> + if (queue.size() == 0) {
>> + /*
>> + * If queue is empty - there is possibility
>> that write method
>> + * will add entry, but it will not be
>> processed, because
>> + * onWriteLock is currently locked
>> + */
>> + channelEntry.updateLock.lock();
>> + hasUpdateLock = true;
>> + }
>> + }
>> + } finally {
>> + if (hasOnWriteLock) {
>> + channelEntry.onWriteLock.unlock();
>> + }
>> +
>> + if (hasUpdateLock) {
>> + channelEntry.updateLock.unlock();
>> + }
>> + }
>> + }
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> ByteBuffer buffer) throws IOException {
>> + throw new UnsupportedOperationException("Not supported for
>> TCP transport.");
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + throw new UnsupportedOperationException("Not supported for
>> TCP transport.");
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer) throws IOException {
>> + throw new UnsupportedOperationException("Not supported for
>> TCP transport.");
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void onClose(SelectableChannel channel) {
>> + writeQueue.removeEntry(channel);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void close() {
>> + writeQueue.clear();
>> + writeQueue = null;
>> + }
>> +
>> + private void registerForWriting(SelectionKey key) {
>> + selectorHandler.register(key, SelectionKey.OP_WRITE);
>> + }
>> +
>> + /**
>> + * <code>AsyncWriteQueue</code> data unit specific for TCP protocol
>> + */
>> + protected static class TCPAsyncQueueRecord {
>> + public ByteBuffer byteBuffer;
>> + public AsyncWriteCallbackHandler callbackHandler;
>> +
>> + public TCPAsyncQueueRecord(ByteBuffer byteBuffer,
>> + AsyncWriteCallbackHandler callbackHandler) {
>> + this.byteBuffer = byteBuffer;
>> + this.callbackHandler = callbackHandler;
>> + }
>> + }
>> +}
>> Index: async/UDPAsyncQueueWriter.java
>> --- async/UDPAsyncQueueWriter.java Locally New
>> +++ async/UDPAsyncQueueWriter.java Locally New
>> @@ -0,0 +1,271 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.async;
>> +
>> +import com.sun.grizzly.*;
>> +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
>> +import java.io.IOException;
>> +import java.nio.ByteBuffer;
>> +import java.nio.channels.SelectableChannel;
>> +import java.nio.channels.SelectionKey;
>> +import java.nio.channels.WritableByteChannel;
>> +import java.util.concurrent.ConcurrentLinkedQueue;
>> +import java.net.SocketAddress;
>> +import java.nio.channels.DatagramChannel;
>> +
>> +/**
>> + * UDP implementation of <code>AsyncQueueWriter</code>
>> + *
>> + * @author Alexey Stashok
>> + */
>> +public class UDPAsyncQueueWriter implements AsyncQueueWriter {
>> + private SelectorHandler selectorHandler;
>> + private AsyncWriteQueue<UDPAsyncQueueRecord> writeQueue;
>> +
>> + public UDPAsyncQueueWriter(SelectorHandler selectorHandler) {
>> + this.selectorHandler = selectorHandler;
>> + writeQueue = new AsyncWriteQueue<UDPAsyncQueueRecord>();
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
>> IOException {
>> + return write(key, null, buffer, null);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> + return write(key, null, buffer, callbackHandler, false);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, ByteBuffer buffer,
>> + AsyncWriteCallbackHandler callbackHandler, boolean
>> isCloneByteBuffer) throws IOException {
>> + return write(key, null, buffer, callbackHandler,
>> isCloneByteBuffer);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> + ByteBuffer buffer) throws IOException {
>> + return write(key, dstAddress, buffer, null);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> + ByteBuffer buffer, AsyncWriteCallbackHandler
>> callbackHandler)
>> + throws IOException {
>> + return write(key, dstAddress, buffer, callbackHandler, false);
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean write(SelectionKey key, SocketAddress dstAddress,
>> + ByteBuffer buffer, AsyncWriteCallbackHandler
>> callbackHandler,
>> + boolean isCloneByteBuffer) throws IOException {
>> +
>> + SelectableChannel channel = key.channel();
>> + // If AsyncWriteQueue is empty - try to write ByteBuffer here
>> + ChannelAsyncWriteEntry channelEntry =
>> + writeQueue.obtainChannelAsyncWriteEntry(channel);
>> +
>> + ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
>> channelEntry.queue;
>> +
>> + boolean hasWriteLock = false;
>> +
>> + try {
>> + if (queue.isEmpty() &&
>> + (hasWriteLock = channelEntry.writeLock.tryLock())) {
>> + doWrite((WritableByteChannel) channel, dstAddress,
>> buffer);
>> + }
>> +
>> + if (buffer.hasRemaining()) {
>> + // clone ByteBuffer if required
>> + if (isCloneByteBuffer) {
>> + int size = buffer.remaining();
>> + ByteBuffer newBuffer = buffer.isDirect() ?
>> + ByteBuffer.allocateDirect(size) :
>> + ByteBuffer.allocate(size);
>> +
>> + newBuffer.put(buffer);
>> + newBuffer.position(0);
>> + buffer = newBuffer;
>> + }
>>
>> Same as above. Support HEAP and VIEW_HEAP/DIRECT via ByteBufferFactory
> Fixed the same way as for TCP.
>
>> +
>> + // add new element to the queue
>> + channelEntry.updateLock.lock();
>> + queue.offer(new UDPAsyncQueueRecord(dstAddress, buffer,
>> + callbackHandler));
>> +
>> + /*
>> + * This check helps to avoid not required key
>> registering
>> + */
>> + if (!channelEntry.onWriteLock.isLocked()) {
>> + registerForWriting(key);
>> + }
>> +
>> + channelEntry.updateLock.unlock();
>> +
>> + return false;
>> + }
>> + } finally {
>> + if (hasWriteLock) {
>> + channelEntry.writeLock.unlock();
>> + }
>> + }
>> +
>> + return true;
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public boolean hasReadyAsyncWriteData(SelectionKey key) {
>> + return writeQueue.size(key.channel()) > 0;
>> + }
>> +
>> + /**
>> + * {_at_inheritDoc}
>> + */
>> + public void onWrite(SelectionKey key) throws IOException {
>> + SelectableChannel channel = key.channel();
>> +
>> + ChannelAsyncWriteEntry channelEntry =
>> + writeQueue.obtainChannelAsyncWriteEntry(channel);
>> +
>> + boolean hasUpdateLock = false;
>> + boolean hasOnWriteLock = false;
>> +
>> + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
>> + try {
>> + ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
>> + channelEntry.queue;
>> +
>> + while (queue.size() > 0) {
>> + if (hasUpdateLock) {
>> + channelEntry.updateLock.unlock();
>> + hasUpdateLock = false;
>> + }
>> +
>> + UDPAsyncQueueRecord queueRecord = queue.peek();
>> +
>> + ByteBuffer byteBuffer = queueRecord.byteBuffer;
>> + try {
>> + doWrite((WritableByteChannel) channel,
>> + queueRecord.dstAddress, byteBuffer);
>> + } catch (IOException e) {
>> + if (queueRecord.callbackHandler != null) {
>> +
>> queueRecord.callbackHandler.onIOException(key,
>> + byteBuffer);
>> + }
>> + }
>>
>> else log the exception.
> Fixed.
>
>> Index: Context.java
>> --- Context.java Base (BASE)
>> +++ Context.java Locally Modified (Based On LOCAL)
>> @@ -23,9 +23,14 @@
>>
>> package com.sun.grizzly;
>>
>> +import com.sun.grizzly.async.AsyncQueueWritable;
>> +import com.sun.grizzly.async.AsyncQueueWriter;
>> +import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>> +import java.io.IOException;
>> +import java.net.SocketAddress;
>> +import java.nio.ByteBuffer;
>> import java.nio.channels.SelectionKey;
>> import java.util.HashMap;
>> -import java.util.concurrent.Callable;
>>
>> /**
>> * This Object is used to share information between the Grizzly
>> Framework
>> @@ -33,7 +38,7 @@
>> *
>> * @author Jeanfrancois Arcand
>> */
>> -public class Context implements Callable {
>> +public class Context {
>>
>> /**
>> * A SelectionKey's registration state.
>> @@ -119,6 +124,16 @@
>>
>>
>> /**
>> + * <code>AsyncQueueWriter</code>
>> + */
>> + private AsyncQueueWriter asyncQueueWriter;
>> +
>> + /**
>> + * <code>AsyncQueueWritable</code>
>> + */
>> + private AsyncQueueWritable asyncQueueWritable;
>> +
>> + /**
>> * Constructor
>> */
>> public Context() {
>> @@ -210,6 +225,7 @@
>> currentOpType = null;
>> protocolChain = null;
>> ioEvent = null;
>> + asyncQueueWriter = null;
>> }
>>
>>
>> @@ -233,47 +249,6 @@
>>
>>
>> /**
>> - * Execute the <code>ProtocolChain</code>.
>> - * @throws java.lang.Exception Exception thrown by protocol chain
>> - */
>> - public Object call() throws Exception {
>> - // If a IOEvent has been defined, invoke it first and
>> - // let its associated CallbackHandler decide if the
>> ProtocolChain
>> - // be invoked or not.
>> - Object attachment = key.attachment();
>> - if (ioEvent != null && (attachment instanceof CallbackHandler)){
>> - try{
>> - CallbackHandler callBackHandler =
>> ((CallbackHandler)attachment);
>> - if (currentOpType == OpType.OP_READ){
>> - callBackHandler.onRead(ioEvent);
>> - } else if (currentOpType == OpType.OP_WRITE){
>> - callBackHandler.onWrite(ioEvent);
>> - } else if (currentOpType == OpType.OP_CONNECT){
>> - callBackHandler.onConnect(ioEvent);
>> - }
>> - } finally {
>> - if (ioEvent != null){
>> - // Prevent the CallbackHandler to re-use the
>> context.
>> - // TODO: This is still dangerous as the Context
>> might have been
>> - // cached by the CallbackHandler.
>> - ioEvent.attach(null);
>> - ioEvent = null;
>> - }
>> - }
>> - } else {
>> - SelectionKey currentKey = key;
>> -
>> selectorHandler.getSelectionKeyHandler().process(currentKey);
>> - try {
>> - protocolChain.execute(this);
>> - } finally {
>> - selectorHandler.getSelectionKeyHandler().postProcess(currentKey);
>> - }
>> - }
>> - return null;
>> - }
>> -
>> -
>> - /**
>> * Return <code>ProtocolChain</code> executed by this instance.
>> * @return <code>ProtocolChain</code> instance
>> */
>> @@ -313,9 +288,12 @@
>> * Execute this Context using the Controller's Pipeline
>> * @throws com.sun.grizzly.PipelineFullException
>> */
>> - public void execute() throws PipelineFullException{
>>
>>
>> Here we must keep the execute() method available as it will breaks
>> existing application (like Sailfin).
> Fixed.
>
>> Index: Controller.java
>> --- Controller.java Base (BASE)
>> +++ Controller.java Locally Modified (Based On LOCAL)
>> @@ -298,6 +298,7 @@
>> key = iterator.next();
>> iterator.remove();
>> boolean skipOpWrite = false;
>> + delegateToWorkerThread = false;
>> if (key.isValid()) {
>> if ((key.readyOps() & SelectionKey.OP_ACCEPT)
>> == SelectionKey.OP_ACCEPT){
>> @@ -356,7 +357,7 @@
>> if (logger.isLoggable(Level.FINE)) {
>> logger.log(Level.FINE, "OP_WRITE on "
>> + key);
>> }
>> - delegateToWorkerThread = selectorHandler.
>> + delegateToWorkerThread |= selectorHandler.
>> onWriteInterest(key,serverCtx);
>> }
>>
>> @@ -368,7 +369,9 @@
>> Context context =
>> pollContext(key,protocolChain);
>> context.setSelectorHandler(selectorHandler);
>>
>> context.setPipeline(selectorHandler.pipeline());
>> - context.execute();
>> + context.setAsyncQueueWriter(
>> +
>> selectorHandler.getAsyncQueueWriter());
>> + context.execute(ProtocolChainContextTask.poll());
>>
>> This bring an interesting point. Instead of setting the
>> setAsyncQueueWriter on every request, should the Context instance be
>> pooled per Selector instead of the Controller? That way once
>> configured, a Context can be pooled and never have to be re-configured
>> again?
> I thought about this. This will require adding more methods to the
> SelectorHandler interface like pollContext, release/offerContext. I
> prefer to not add much methods to the interface, but seems it could be
> the right way.

Agree :-) Lets keep the current design and we can revist it later!

Great work!

-- Jeanfrancois


>
>
> Thank you for feedback!
>
> WBR,
> Alexey.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>