Hello,
>>
>> There are 3 classes attached.
>> AsyncWriter: This is the main class user will use to write data in
>> async. mode. Class has several overloaded write methods (see javadoc
>> for more details).
>> AsyncWriteCallbackHandler: interface, which could be implemented by
>> custom code to be notified, when Buffer will be async. written.
>> Instance of AsyncWriteCallbackHandler could be passed as one of
>> AsyncWriter.write parameters.
>> AsyncWriteQueue: represents Chanel <-> WriteQueue correspondence
>> collection.
>>
>> Classes are described more accurately in javadocs.
>>
>> As I mentioned, think AsyncWriter should be integrated to the
>> SelectorHandler. So the way developer will be able to use its
>> functionality is:
>> selectorHandler.getAsyncWriter().write(byteBuffer...).
>
> Or since SelectorHandler aren't "easily" available in a
> ProtocolFilter, would it make sense to reach them from the Context
> instead. Since the Context is near a 'Session' for us, I think it may
> make sense to have it available on it.
>
> Something like:
>
> Context.getAsyncWriter() and
> Context.getAsyncReader()
Currently Context has context.getSelectorHandler(), so we can use it.
"context.getSelectorHandler().getAsyncWriter()"
> As part of your proposal I would like to to think about the
> AsyncReader concept (at least have the API, and let the discussion open).
Hmm... Do you have any specific usecase for it?
> Now I think the SelectorHandler will need a little re-work to make
> sure the AsyncWrite.onWrite is called, right?
Sure. onWrite, onClose should be called from SelectorHandler.
> Also to unify our CallbackHandler in grizzly, we might want to:
>
> + Make CallbackHandler extends AsyncWriter
Agree, this could be useful.
> + Add a new marker interface that can be extended by both type of
> CallbackHandler
> + Have two separates Handlers like you are proposing.
We can leave them as 2 separate, and then will change this depending on
community feedback.
Thanks.
WBR,
Alexey.
>
>>
>> Will appreciate any feedback! :)
>>
>> Thanks.
>>
>> WBR,
>> Alexey.
>>
>> /*
>> * The contents of this file are subject to the terms
>> * of the Common Development and Distribution License
>> * (the License). You may not use this file except in
>> * compliance with the License.
>> *
>> * You can obtain a copy of the license at
>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> * See the License for the specific language governing
>> * permissions and limitations under the License.
>> *
>> * When distributing Covered Code, include this CDDL
>> * Header Notice in each file and include the License file
>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> * If applicable, add the following below the CDDL Header,
>> * with the fields enclosed by brackets [] replaced by
>> * you own identifying information:
>> * "Portions Copyrighted [year] [name of copyright owner]"
>> *
>> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> */
>>
>> package com.sun.grizzly;
>>
>> import java.io.IOException;
>> import java.nio.ByteBuffer;
>> import java.nio.channels.SelectableChannel;
>> import java.nio.channels.SelectionKey;
>> import java.nio.channels.WritableByteChannel;
>>
>> /**
>> *
>> * @author Alexey Stashok
>> */
>> public class AsyncWriter {
>> private SelectorHandler selectorHandler;
>> private AsyncWriteQueue writeQueue;
>> public AsyncWriter(SelectorHandler selectorHandler) {
>> this.selectorHandler = selectorHandler;
>> writeQueue = new AsyncWriteQueue();
>> }
>> /**
>> * Writes <code>ByteBuffer</code> to the
>> <code>SelectableChannel</code>
>> * First it tries to write <code>ByteBuffer</code> to the given
>> <code>SelectableChannel</code>.
>> * If, after write is called, <code>ByteBuffer</code> still has
>> ready data to be written -
>> * <code>ByteBuffer</code> will be added to
>> <code>AsyncWriteQueue</code> * and <code>SelectableChannel<code>
>> will be registered on * <code>SelectorHandler</code>, waiting
>> for OP_WRITE event.
>
> Here I would add:
>
> * if an unexpected exception occurs, the exception will not be
> propagated to the caller of this class, but logged by the Grizzly
> framework
>
> We probably make it clear that without a CallBackHandler, any
> IOException will be swallowed by the framework.
>
>
>> * * @param channel <code>SelectableChannel</code>
>> <code>ByteBuffer</code> should be written to
>> * @param buffer <code>ByteBuffer</code>
>> * @return true, if <code>ByteBuffer</code> was written
>> completely, false if write operation was put to queue
>> * @throws java.io.IOException
>> */
>> public boolean write(SelectableChannel channel, ByteBuffer
>> buffer) throws IOException {
>> return write(channel, buffer, null);
>> }
>> /**
>> * Writes <code>ByteBuffer</code> to the
>> <code>SelectableChannel</code>
>> * First it tries to write <code>ByteBuffer</code> to the given
>> * <code>SelectableChannel</code>.
>> * If, after write is called, <code>ByteBuffer</code> still has
>> ready * data to be written - <code>ByteBuffer</code> will be
>> added to * <code>AsyncWriteQueue</code> and
>> <code>SelectableChannel<code> will * be registered on
>> <code>SelectorHandler</code>, waiting for OP_WRITE event.
>> * * @param channel <code>SelectableChannel</code>
>> <code>ByteBuffer</code> * should be written to
>> * @param buffer <code>ByteBuffer</code>
>> * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> * which will get notified, when
>> * <code>ByteBuffer</code> will be completely
>> written
>> * @return true, if <code>ByteBuffer</code> was written
>> completely, * false if write operation was put to queue
>> * @throws java.io.IOException
>> */
>> public boolean write(SelectableChannel channel, ByteBuffer
>> buffer, AsyncWriteCallbackHandler callbackHandler) throws
>> IOException {
>> return write(channel, buffer, callbackHandler, false);
>> }
>>
>> /**
>> * Writes <code>ByteBuffer</code> to the
>> <code>SelectableChannel</code>
>> * First it tries to write <code>ByteBuffer</code> to the given
>> * <code>SelectableChannel</code>.
>> * If, after write is called, <code>ByteBuffer</code> still has
>> ready * data to be written - <code>ByteBuffer</code> will be
>> added to * <code>AsyncWriteQueue</code> and
>> <code>SelectableChannel<code> will * be registered on
>> <code>SelectorHandler</code>, waiting for OP_WRITE event.
>> * * @param channel <code>SelectableChannel</code>
>> <code>ByteBuffer</code> * should be written to
>> * @param buffer <code>ByteBuffer</code>
>> * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
>> * which will get notified, when
>> * <code>ByteBuffer</code> will be completely
>> written
>> * @param isCloneByteBuffer if true - AsyncWriter will clone
>> given * <code>ByteBuffer</code> before
>> puting it to the *
>> <code>AsyncWriteQueue</code>
>> * @return true, if <code>ByteBuffer</code> was written
>> completely, * false if write operation was put to queue
>> * @throws java.io.IOException
>> */
>> public boolean write(SelectableChannel channel, ByteBuffer
>> buffer, AsyncWriteCallbackHandler callbackHandler,
>> boolean isCloneByteBuffer) throws IOException {
>> // If AsyncWriteQueue is empty - try to write
>> ByteBuffer here
>> if (!hasReadyAsyncWriteData(channel)) {
>> ((WritableByteChannel) channel).write(buffer);
>> }
>> if (buffer.hasRemaining()) {
>> // clone ByteBuffer if required
>> if (isCloneByteBuffer) {
>> int size = buffer.remaining();
>> ByteBuffer newBuffer = buffer.isDirect() ?
>> ByteBuffer.allocateDirect(size) :
>> ByteBuffer.allocate(size);
>> newBuffer.put(buffer);
>> newBuffer.position(0);
>> buffer = newBuffer;
>> }
>> // add new element to the queue
>> writeQueue.offer(channel, new
>> AsyncWriteQueue.AsyncQueueRecord(buffer, callbackHandler));
>> selectorHandler.register(channel, SelectionKey.OP_WRITE);
>> return false;
>> }
>> return true;
>> }
>>
>> /**
>> * Checks whether there is any data in
>> <code>AsyncWriteQueue</code> ready
>> * for the given <code>SelectableChannel</code>
>> * * @param channel <code>SelectableChannel</code>
>> * @return true, if there is ready data. False otherwise.
>> */
>> public boolean hasReadyAsyncWriteData(SelectableChannel channel) {
>> return writeQueue.size(channel) > 0;
>> }
>> /**
>> * Callback method, which should be called by
>> <code>SelectorHandler</code> to
>> * notify, that given <code>SelectableChannel</code> is ready to
>> transmit data.
>> * * @param channel <code>SelectableChannel</code>
>> * @throws java.io.IOException
>> */
>> public void onWrite(SelectableChannel channel) throws IOException {
>> AsyncWriteQueue.AsyncQueueRecord queueRecord =
>> writeQueue.peek(channel);
>> ByteBuffer byteBuffer = queueRecord.byteBuffer;
>> ((WritableByteChannel) channel).write(byteBuffer);
>> if (byteBuffer.hasRemaining()) {
>> selectorHandler.register(channel, SelectionKey.OP_WRITE);
>
>
>
>> } else {
>> AsyncWriteQueue.AsyncQueueRecord removedQueueRecord =
>> writeQueue.poll(channel);
>> assert removedQueueRecord == queueRecord;
>> if (removedQueueRecord.callbackHandler != null) {
>>
>> removedQueueRecord.callbackHandler.notifyWriteCompleted(channel,
>> byteBuffer);
>> }
>> }
>> }
>> /**
>> * Callback method, which should be called by
>> <code>SelectorHandler</code> to
>> * notify, that given <code>SelectableChannel</code> is going to
>> be closed, so
>> * related <code>SelectableChannel</code> data could be released
>> from * <code>AsyncWriteQueue</code>
>> * * @param channel <code>SelectableChannel</code>
>> * @throws java.io.IOException
>> */
>> public void onClose(SelectableChannel channel) {
>> writeQueue.removeEntry(channel);
>> }
>> }
>>
>>
>>
>> /*
>> * The contents of this file are subject to the terms
>> * of the Common Development and Distribution License
>> * (the License). You may not use this file except in
>> * compliance with the License.
>> *
>> * You can obtain a copy of the license at
>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> * See the License for the specific language governing
>> * permissions and limitations under the License.
>> *
>> * When distributing Covered Code, include this CDDL
>> * Header Notice in each file and include the License file
>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> * If applicable, add the following below the CDDL Header,
>> * with the fields enclosed by brackets [] replaced by
>> * you own identifying information:
>> * "Portions Copyrighted [year] [name of copyright owner]"
>> *
>> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> */
>>
>> package com.sun.grizzly;
>>
>> import java.nio.ByteBuffer;
>> import java.nio.channels.SelectableChannel;
>>
>> /**
>> * Callback handler interface, used by AsyncWriter to notify custom
>> code about
>> * completion of specific <code>ByteBuffer</code> writing.
>> * * @author Alexey Stashok
>> */
>> public interface AsyncWriteCallbackHandler {
>> public void notifyWriteCompleted(SelectableChannel channel,
>> ByteBuffer buffer);
>> }
>>
>
> I would recommend we add something like:
>
> public void notifyIOException(SelectableChannel channel,
> ByteBuffer byteBuffer, Exception exception);
>
> to the interface. Should we use instead (to follow the same pattern
> than the current CallBackhandler):
>
> onWriteCompleted
> onIOException
>
> ?
>
>>
>>
>> /*
>> * The contents of this file are subject to the terms
>> * of the Common Development and Distribution License
>> * (the License). You may not use this file except in
>> * compliance with the License.
>> *
>> * You can obtain a copy of the license at
>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> * See the License for the specific language governing
>> * permissions and limitations under the License.
>> *
>> * When distributing Covered Code, include this CDDL
>> * Header Notice in each file and include the License file
>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> * If applicable, add the following below the CDDL Header,
>> * with the fields enclosed by brackets [] replaced by
>> * you own identifying information:
>> * "Portions Copyrighted [year] [name of copyright owner]"
>> *
>> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> */
>>
>> package com.sun.grizzly;
>>
>> import java.nio.ByteBuffer;
>> import java.nio.channels.SelectableChannel;
>> import java.util.Map;
>> import java.util.concurrent.ConcurrentHashMap;
>> import java.util.concurrent.ConcurrentLinkedQueue;
>>
>> /**
>> * Class represents collection of <code>SelectableChannel</code>s and
>> * correspondent queue of <code>ByteBuffer</code>, which * should be
>> written asynchronously.
>> * * @author Alexey Stashok
>> */
>> public class AsyncWriteQueue {
>> private Map<SelectableChannel,
>> ConcurrentLinkedQueue<AsyncQueueRecord>> queueMap =
>> new ConcurrentHashMap<SelectableChannel,
>> ConcurrentLinkedQueue<AsyncQueueRecord>>();
>>
>> /**
>> * Add data to the <code>AsyncWriteQueue</code>, corresponding to
>> the given
>> * <code>SelectableChannel</code>
>> * * @param channel <code>SelectableChannel</code>
>> * @param queueRecord write data unit
>> */
>> public void offer(SelectableChannel channel, AsyncQueueRecord
>> queueRecord) {
>> ConcurrentLinkedQueue<AsyncQueueRecord> queue =
>> queueMap.get(channel);
>> if (queue == null) {
>> synchronized(channel) {
>
> For later: Read/WriteLock might be more efficient here.
>
>> queue = queueMap.get(channel);
>> if (queue == null) {
>> queue = new
>> ConcurrentLinkedQueue<AsyncQueueRecord>();
>> queueMap.put(channel, queue);
>> }
>> }
>> }
>> queue.offer(queueRecord);
>> }
>> /**
>> * Get head element of <code>SelectableChannel</code> write queue.
>> * Element will not be removed from queue.
>> * * @param channel <code>SelectableChannel</code>
>> *
>> * @return <code>AsyncQueueRecord</code> write data unit
>> */
>> public AsyncQueueRecord peek(SelectableChannel channel) {
>> ConcurrentLinkedQueue<AsyncQueueRecord> queue =
>> queueMap.get(channel);
>> if (queue != null) {
>> return queue.peek();
>> }
>> return null;
>> }
>>
>> /**
>> * Get head element of <code>SelectableChannel</code> write queue.
>> * Element will be removed from queue.
>> * * @param channel <code>SelectableChannel</code>
>> *
>> * @return <code>AsyncQueueRecord</code> write data unit
>> */
>> public AsyncQueueRecord poll(SelectableChannel channel) {
>> ConcurrentLinkedQueue<AsyncQueueRecord> queue =
>> queueMap.get(channel);
>> if (queue != null) {
>> return queue.poll();
>> }
>> return null;
>> }
>>
>> /**
>> * Remove head element of <code>SelectableChannel</code> write
>> queue.
>> * * @param channel <code>SelectableChannel</code>
>> */
>> public void removeEntry(SelectableChannel channel) {
>> queueMap.remove(channel);
>> }
>>
>> /**
>> * Get the size of <code>SelectableChannel</code> write queue.
>> * * @param channel <code>SelectableChannel</code>
>> * @return size of <code>SelectableChannel</code> write queue.
>> */
>> public int size(SelectableChannel channel) {
>> ConcurrentLinkedQueue<AsyncQueueRecord> queue =
>> queueMap.get(channel);
>> return queue == null ? 0 : queue.size();
>> }
>> /**
>> * <code>AsyncWriteQueue</code> data unit
>> */
>> public static class AsyncQueueRecord {
>
> Should be at least protected?
>
>> public ByteBuffer byteBuffer;
>> public AsyncWriteCallbackHandler callbackHandler;
>> public AsyncQueueRecord(ByteBuffer byteBuffer,
>> AsyncWriteCallbackHandler callbackHandler) {
>> this.byteBuffer = byteBuffer;
>> this.callbackHandler = callbackHandler;
>> }
>> }
>> }
>>
>
> Thanks!
>
> -- Jeanfrancois
>
>>
>> ------------------------------------------------------------------------
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>