dev@grizzly.java.net

Re: AsyncWriter proposal

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Mon, 05 Nov 2007 16:01:14 +0100

Hello,

Here is the 2nd try for AsyncQueueWriter. This time I have it integrated
and tested.
Attached AsyncWriteQueue.zip has implementation of AsyncWriteQueue;
AsyncWriteQueue-complete.diff has complete svn diff for implementation
and integration code.

In diff file you can see refactoring I'm proposing for Context class.
Currently context class has some logic, which is executed by
Pipeline(ThreadPool): callback handler, protocol chain execution. I
propose to move this logic to ContextTask interface implementations,
this will simplify Context class. Because currently we have 2
possibilities for Context: execute callback handler, protocol chain.
After adding AsyncWriteQueue - 3 possibilities... Think it's too much to
hold in Context?

I was thinking about async reading, looks like we can apply it for UDP
protocol. (Discussion with Radim on users_at_grizzly mailing list). This
way we can try to simulate some features, which TCP protocol has, for
the server side.

Thanks.

WBR,
Alexey.

Oleksiy Stashok wrote:
> Hello,
>>>
>>> There are 3 classes attached.
>>> AsyncWriter: This is the main class user will use to write data in
>>> async. mode. Class has several overloaded write methods (see javadoc
>>> for more details).
>>> AsyncWriteCallbackHandler: interface, which could be implemented by
>>> custom code to be notified, when Buffer will be async. written.
>>> Instance of AsyncWriteCallbackHandler could be passed as one of
>>> AsyncWriter.write parameters.
>>> AsyncWriteQueue: represents Chanel <-> WriteQueue correspondence
>>> collection.
>>>
>>> Classes are described more accurately in javadocs.
>>>
>>> As I mentioned, think AsyncWriter should be integrated to the
>>> SelectorHandler. So the way developer will be able to use its
>>> functionality is:
>>> selectorHandler.getAsyncWriter().write(byteBuffer...).
>>
>> Or since SelectorHandler aren't "easily" available in a
>> ProtocolFilter, would it make sense to reach them from the Context
>> instead. Since the Context is near a 'Session' for us, I think it may
>> make sense to have it available on it.
>>
>> Something like:
>>
>> Context.getAsyncWriter() and
>> Context.getAsyncReader()
> Currently Context has context.getSelectorHandler(), so we can use it.
> "context.getSelectorHandler().getAsyncWriter()"
>
>> As part of your proposal I would like to to think about the
>> AsyncReader concept (at least have the API, and let the discussion
>> open).
> Hmm... Do you have any specific usecase for it?
>
>> Now I think the SelectorHandler will need a little re-work to make
>> sure the AsyncWrite.onWrite is called, right?
> Sure. onWrite, onClose should be called from SelectorHandler.
>
>> Also to unify our CallbackHandler in grizzly, we might want to:
>>
>> + Make CallbackHandler extends AsyncWriter
> Agree, this could be useful.
>
>> + Add a new marker interface that can be extended by both type of
>> CallbackHandler
>> + Have two separates Handlers like you are proposing.
> We can leave them as 2 separate, and then will change this depending
> on community feedback.
>
> Thanks.
>
> WBR,
> Alexey.
>>
>>>
>>> Will appreciate any feedback! :)
>>>
>>> Thanks.
>>>
>>> WBR,
>>> Alexey.
>>>
>>> /*
>>> * The contents of this file are subject to the terms
>>> * of the Common Development and Distribution License
>>> * (the License). You may not use this file except in
>>> * compliance with the License.
>>> *
>>> * You can obtain a copy of the license at
>>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>>> * See the License for the specific language governing
>>> * permissions and limitations under the License.
>>> *
>>> * When distributing Covered Code, include this CDDL
>>> * Header Notice in each file and include the License file
>>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>>> * If applicable, add the following below the CDDL Header,
>>> * with the fields enclosed by brackets [] replaced by
>>> * you own identifying information:
>>> * "Portions Copyrighted [year] [name of copyright owner]"
>>> *
>>> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>>> */
>>>
>>> package com.sun.grizzly;
>>>
>>> import java.io.IOException;
>>> import java.nio.ByteBuffer;
>>> import java.nio.channels.SelectableChannel;
>>> import java.nio.channels.SelectionKey;
>>> import java.nio.channels.WritableByteChannel;
>>>
>>> /**
>>> *
>>> * @author Alexey Stashok
>>> */
>>> public class AsyncWriter {
>>> private SelectorHandler selectorHandler;
>>> private AsyncWriteQueue writeQueue;
>>> public AsyncWriter(SelectorHandler selectorHandler) {
>>> this.selectorHandler = selectorHandler;
>>> writeQueue = new AsyncWriteQueue();
>>> }
>>> /**
>>> * Writes <code>ByteBuffer</code> to the
>>> <code>SelectableChannel</code>
>>> * First it tries to write <code>ByteBuffer</code> to the given
>>> <code>SelectableChannel</code>.
>>> * If, after write is called, <code>ByteBuffer</code> still has
>>> ready data to be written -
>>> * <code>ByteBuffer</code> will be added to
>>> <code>AsyncWriteQueue</code> * and
>>> <code>SelectableChannel<code> will be registered on *
>>> <code>SelectorHandler</code>, waiting for OP_WRITE event.
>>
>> Here I would add:
>>
>> * if an unexpected exception occurs, the exception will not be
>> propagated to the caller of this class, but logged by the Grizzly
>> framework
>>
>> We probably make it clear that without a CallBackHandler, any
>> IOException will be swallowed by the framework.
>>
>>
>>> * * @param channel <code>SelectableChannel</code>
>>> <code>ByteBuffer</code> should be written to
>>> * @param buffer <code>ByteBuffer</code>
>>> * @return true, if <code>ByteBuffer</code> was written
>>> completely, false if write operation was put to queue
>>> * @throws java.io.IOException
>>> */
>>> public boolean write(SelectableChannel channel, ByteBuffer
>>> buffer) throws IOException {
>>> return write(channel, buffer, null);
>>> }
>>> /**
>>> * Writes <code>ByteBuffer</code> to the
>>> <code>SelectableChannel</code>
>>> * First it tries to write <code>ByteBuffer</code> to the given
>>> * <code>SelectableChannel</code>.
>>> * If, after write is called, <code>ByteBuffer</code> still has
>>> ready * data to be written - <code>ByteBuffer</code> will be
>>> added to * <code>AsyncWriteQueue</code> and
>>> <code>SelectableChannel<code> will * be registered on
>>> <code>SelectorHandler</code>, waiting for OP_WRITE event.
>>> * * @param channel <code>SelectableChannel</code>
>>> <code>ByteBuffer</code> * should be written to
>>> * @param buffer <code>ByteBuffer</code>
>>> * @param callbackHandler
>>> <code>AsyncWriteCallbackHandler</code>,
>>> * which will get notified, when
>>> * <code>ByteBuffer</code> will be completely
>>> written
>>> * @return true, if <code>ByteBuffer</code> was written
>>> completely, * false if write operation was put to queue
>>> * @throws java.io.IOException
>>> */
>>> public boolean write(SelectableChannel channel, ByteBuffer
>>> buffer, AsyncWriteCallbackHandler callbackHandler)
>>> throws IOException {
>>> return write(channel, buffer, callbackHandler, false);
>>> }
>>>
>>> /**
>>> * Writes <code>ByteBuffer</code> to the
>>> <code>SelectableChannel</code>
>>> * First it tries to write <code>ByteBuffer</code> to the given
>>> * <code>SelectableChannel</code>.
>>> * If, after write is called, <code>ByteBuffer</code> still has
>>> ready * data to be written - <code>ByteBuffer</code> will be
>>> added to * <code>AsyncWriteQueue</code> and
>>> <code>SelectableChannel<code> will * be registered on
>>> <code>SelectorHandler</code>, waiting for OP_WRITE event.
>>> * * @param channel <code>SelectableChannel</code>
>>> <code>ByteBuffer</code> * should be written to
>>> * @param buffer <code>ByteBuffer</code>
>>> * @param callbackHandler
>>> <code>AsyncWriteCallbackHandler</code>,
>>> * which will get notified, when
>>> * <code>ByteBuffer</code> will be completely
>>> written
>>> * @param isCloneByteBuffer if true - AsyncWriter will clone
>>> given * <code>ByteBuffer</code> before
>>> puting it to the *
>>> <code>AsyncWriteQueue</code>
>>> * @return true, if <code>ByteBuffer</code> was written
>>> completely, * false if write operation was put to queue
>>> * @throws java.io.IOException
>>> */
>>> public boolean write(SelectableChannel channel, ByteBuffer
>>> buffer, AsyncWriteCallbackHandler callbackHandler,
>>> boolean isCloneByteBuffer) throws IOException {
>>> // If AsyncWriteQueue is empty - try to write
>>> ByteBuffer here
>>> if (!hasReadyAsyncWriteData(channel)) {
>>> ((WritableByteChannel) channel).write(buffer);
>>> }
>>> if (buffer.hasRemaining()) {
>>> // clone ByteBuffer if required
>>> if (isCloneByteBuffer) {
>>> int size = buffer.remaining();
>>> ByteBuffer newBuffer = buffer.isDirect() ?
>>> ByteBuffer.allocateDirect(size) :
>>> ByteBuffer.allocate(size);
>>> newBuffer.put(buffer);
>>> newBuffer.position(0);
>>> buffer = newBuffer;
>>> }
>>> // add new element to the queue
>>> writeQueue.offer(channel, new
>>> AsyncWriteQueue.AsyncQueueRecord(buffer, callbackHandler));
>>> selectorHandler.register(channel, SelectionKey.OP_WRITE);
>>> return false;
>>> }
>>> return true;
>>> }
>>>
>>> /**
>>> * Checks whether there is any data in
>>> <code>AsyncWriteQueue</code> ready
>>> * for the given <code>SelectableChannel</code>
>>> * * @param channel <code>SelectableChannel</code>
>>> * @return true, if there is ready data. False otherwise.
>>> */
>>> public boolean hasReadyAsyncWriteData(SelectableChannel channel) {
>>> return writeQueue.size(channel) > 0;
>>> }
>>> /**
>>> * Callback method, which should be called by
>>> <code>SelectorHandler</code> to
>>> * notify, that given <code>SelectableChannel</code> is ready to
>>> transmit data.
>>> * * @param channel <code>SelectableChannel</code>
>>> * @throws java.io.IOException
>>> */
>>> public void onWrite(SelectableChannel channel) throws IOException {
>>> AsyncWriteQueue.AsyncQueueRecord queueRecord =
>>> writeQueue.peek(channel);
>>> ByteBuffer byteBuffer = queueRecord.byteBuffer;
>>> ((WritableByteChannel) channel).write(byteBuffer);
>>> if (byteBuffer.hasRemaining()) {
>>> selectorHandler.register(channel, SelectionKey.OP_WRITE);
>>
>>
>>
>>> } else {
>>> AsyncWriteQueue.AsyncQueueRecord removedQueueRecord =
>>> writeQueue.poll(channel);
>>> assert removedQueueRecord == queueRecord;
>>> if (removedQueueRecord.callbackHandler != null) {
>>>
>>> removedQueueRecord.callbackHandler.notifyWriteCompleted(channel,
>>> byteBuffer);
>>> }
>>> }
>>> }
>>> /**
>>> * Callback method, which should be called by
>>> <code>SelectorHandler</code> to
>>> * notify, that given <code>SelectableChannel</code> is going to
>>> be closed, so
>>> * related <code>SelectableChannel</code> data could be released
>>> from * <code>AsyncWriteQueue</code>
>>> * * @param channel <code>SelectableChannel</code>
>>> * @throws java.io.IOException
>>> */
>>> public void onClose(SelectableChannel channel) {
>>> writeQueue.removeEntry(channel);
>>> }
>>> }
>>>
>>>
>>>
>>> /*
>>> * The contents of this file are subject to the terms
>>> * of the Common Development and Distribution License
>>> * (the License). You may not use this file except in
>>> * compliance with the License.
>>> *
>>> * You can obtain a copy of the license at
>>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>>> * See the License for the specific language governing
>>> * permissions and limitations under the License.
>>> *
>>> * When distributing Covered Code, include this CDDL
>>> * Header Notice in each file and include the License file
>>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>>> * If applicable, add the following below the CDDL Header,
>>> * with the fields enclosed by brackets [] replaced by
>>> * you own identifying information:
>>> * "Portions Copyrighted [year] [name of copyright owner]"
>>> *
>>> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>>> */
>>>
>>> package com.sun.grizzly;
>>>
>>> import java.nio.ByteBuffer;
>>> import java.nio.channels.SelectableChannel;
>>>
>>> /**
>>> * Callback handler interface, used by AsyncWriter to notify custom
>>> code about
>>> * completion of specific <code>ByteBuffer</code> writing.
>>> * * @author Alexey Stashok
>>> */
>>> public interface AsyncWriteCallbackHandler {
>>> public void notifyWriteCompleted(SelectableChannel channel,
>>> ByteBuffer buffer);
>>> }
>>>
>>
>> I would recommend we add something like:
>>
>> public void notifyIOException(SelectableChannel channel,
>> ByteBuffer byteBuffer, Exception exception);
>>
>> to the interface. Should we use instead (to follow the same pattern
>> than the current CallBackhandler):
>>
>> onWriteCompleted
>> onIOException
>>
>> ?
>>
>>>
>>>
>>> /*
>>> * The contents of this file are subject to the terms
>>> * of the Common Development and Distribution License
>>> * (the License). You may not use this file except in
>>> * compliance with the License.
>>> *
>>> * You can obtain a copy of the license at
>>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>>> * See the License for the specific language governing
>>> * permissions and limitations under the License.
>>> *
>>> * When distributing Covered Code, include this CDDL
>>> * Header Notice in each file and include the License file
>>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>>> * If applicable, add the following below the CDDL Header,
>>> * with the fields enclosed by brackets [] replaced by
>>> * you own identifying information:
>>> * "Portions Copyrighted [year] [name of copyright owner]"
>>> *
>>> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>>> */
>>>
>>> package com.sun.grizzly;
>>>
>>> import java.nio.ByteBuffer;
>>> import java.nio.channels.SelectableChannel;
>>> import java.util.Map;
>>> import java.util.concurrent.ConcurrentHashMap;
>>> import java.util.concurrent.ConcurrentLinkedQueue;
>>>
>>> /**
>>> * Class represents collection of <code>SelectableChannel</code>s
>>> and * correspondent queue of <code>ByteBuffer</code>, which *
>>> should be written asynchronously.
>>> * * @author Alexey Stashok
>>> */
>>> public class AsyncWriteQueue {
>>> private Map<SelectableChannel,
>>> ConcurrentLinkedQueue<AsyncQueueRecord>> queueMap =
>>> new ConcurrentHashMap<SelectableChannel,
>>> ConcurrentLinkedQueue<AsyncQueueRecord>>();
>>> /**
>>> * Add data to the <code>AsyncWriteQueue</code>, corresponding
>>> to the given
>>> * <code>SelectableChannel</code>
>>> * * @param channel <code>SelectableChannel</code>
>>> * @param queueRecord write data unit
>>> */
>>> public void offer(SelectableChannel channel, AsyncQueueRecord
>>> queueRecord) {
>>> ConcurrentLinkedQueue<AsyncQueueRecord> queue =
>>> queueMap.get(channel);
>>> if (queue == null) {
>>> synchronized(channel) {
>>
>> For later: Read/WriteLock might be more efficient here.
>>
>>> queue = queueMap.get(channel);
>>> if (queue == null) {
>>> queue = new
>>> ConcurrentLinkedQueue<AsyncQueueRecord>();
>>> queueMap.put(channel, queue);
>>> }
>>> }
>>> }
>>> queue.offer(queueRecord);
>>> }
>>> /**
>>> * Get head element of <code>SelectableChannel</code> write queue.
>>> * Element will not be removed from queue.
>>> * * @param channel <code>SelectableChannel</code>
>>> *
>>> * @return <code>AsyncQueueRecord</code> write data unit
>>> */
>>> public AsyncQueueRecord peek(SelectableChannel channel) {
>>> ConcurrentLinkedQueue<AsyncQueueRecord> queue =
>>> queueMap.get(channel);
>>> if (queue != null) {
>>> return queue.peek();
>>> }
>>> return null;
>>> }
>>>
>>> /**
>>> * Get head element of <code>SelectableChannel</code> write queue.
>>> * Element will be removed from queue.
>>> * * @param channel <code>SelectableChannel</code>
>>> *
>>> * @return <code>AsyncQueueRecord</code> write data unit
>>> */
>>> public AsyncQueueRecord poll(SelectableChannel channel) {
>>> ConcurrentLinkedQueue<AsyncQueueRecord> queue =
>>> queueMap.get(channel);
>>> if (queue != null) {
>>> return queue.poll();
>>> }
>>> return null;
>>> }
>>>
>>> /**
>>> * Remove head element of <code>SelectableChannel</code> write
>>> queue.
>>> * * @param channel <code>SelectableChannel</code>
>>> */
>>> public void removeEntry(SelectableChannel channel) {
>>> queueMap.remove(channel);
>>> }
>>>
>>> /**
>>> * Get the size of <code>SelectableChannel</code> write queue.
>>> * * @param channel <code>SelectableChannel</code>
>>> * @return size of <code>SelectableChannel</code> write queue.
>>> */
>>> public int size(SelectableChannel channel) {
>>> ConcurrentLinkedQueue<AsyncQueueRecord> queue =
>>> queueMap.get(channel);
>>> return queue == null ? 0 : queue.size();
>>> }
>>> /**
>>> * <code>AsyncWriteQueue</code> data unit
>>> */
>>> public static class AsyncQueueRecord {
>>
>> Should be at least protected?
>>
>>> public ByteBuffer byteBuffer;
>>> public AsyncWriteCallbackHandler callbackHandler;
>>> public AsyncQueueRecord(ByteBuffer byteBuffer,
>>> AsyncWriteCallbackHandler callbackHandler) {
>>> this.byteBuffer = byteBuffer;
>>> this.callbackHandler = callbackHandler;
>>> }
>>> }
>>> }
>>>
>>
>> Thanks!
>>
>> -- Jeanfrancois
>>
>>>
>>> ------------------------------------------------------------------------
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>