users@grizzly.java.net

Re: [Latest code] Tunnel implementation

From: Kshitiz Saxena <Kshitiz.Saxena_at_Sun.COM>
Date: Wed, 06 Jun 2007 21:50:06 +0530

Hi,

The latest code provided by Alexey is attached.

I have one question. In HTTP Get request, content-length header may not
be present. How do we decide in this case that we have read complete
request?

Thanks,
Kshitiz

Jeanfrancois Arcand wrote:
> Hi,
>
> can someone post the latest version of the code? I would like to take
> a look to see if I can improve the performance :-)
>
> Thanks
>
> -- Jeanfrancois
>
>
> Oleksiy Stashok wrote:
>> Hi,
>>
>>
>> Kshitiz Saxena wrote:
>>> Hi Alexey,
>>>
>>> I have latest ReadFilter.java from grizzly trunk. I still have this
>>> issue.
>> Should be fixed now. Please update one more time.
>>
>> Thanks!
>>
>> Alexey.
>>>
>>> Thanks,
>>> Kshitiz
>>>
>>> Oleksiy Stashok wrote:
>>>>
>>>>>>
>>>>>> Thanks a lot. Well, I get AlreadyConnectedExceptions if I connect
>>>>>> more than once, the aquired connector seems to be already in
>>>>>> use. What was the reason I could not allocate a new
>>>>>> ConnectorHandler with new TCPConnectorHandler()? Well, but this
>>>>>> helps
>>>>>> also only limited: Every second connection does not work.
>>>>>>
>>>>> Looks like ConnectorHandler.close() wasn't called.
>>>> Seems it was ReadFilter problem. Please retry after svn update.
>>>>
>>>> Alexey.
>>>>
>>>>
>>>>>
>>>>>> I have spend the day to get an own solution, but I failed. The
>>>>>> first SelectionKey which is processed is always OP_READ, not an
>>>>>> OP_ACCEPT. I noticed that the ProtocolFilter chain is only used
>>>>>> if no CallbackHandler is attached
>>>>> As JF wrote, CallbackHandler currently is used as *client side*
>>>>> connection implementation. It was recently added, so there could
>>>>> be some drawbacks, but with your feedback we can improve it.
>>>>> ProtocolFilter is the part of *server side* processing chain.
>>>>> OP_ACCEPT with default TCPSelectorHandler implementation will not
>>>>> be passed for processing by server-chain. But in case of "tunnel"
>>>>> seems it's not required, as connection initiator should send
>>>>> something, right?
>>>>>
>>>>> WBR,
>>>>> Alexey.
>>>>>
>>>>>> (for which scenario is a CallbackHandler good for, and when a
>>>>>> ProtocolFilter if they exclude each other?), so I attached null
>>>>>> to the key after I got a connection. The sources are attached. I
>>>>>> don't know why this programm fails.
>>>>>>
>>>>>> I also attached a modified version of the ConnectorHandler
>>>>>> interface. A Channel can now be attached. For me it was useful or
>>>>>> the only way I had seen to reuse a the convenient
>>>>>> TCPConnectorHandler.
>>>>>>
>>>>>> Best Regards, Karsten
>>>>>>
>>>>>>
>>>>>>> WBR,
>>>>>>> Alexey.
>>>>>>>
>>>>>>> Karsten Ohme wrote:
>>>>>>>
>>>>>>>> On Fri, Jun 01, 2007 at 05:26:18PM +0200, Oleksiy Stashok wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>> Hello Karsten,
>>>>>>>>>
>>>>>>>>> let me try to help you.
>>>>>>>>> Let's start from the beginning and will not use cache for
>>>>>>>>> simplicity, we can always add it later...
>>>>>>>>> As I understood, you are not making http proxy, but some
>>>>>>>>> tunnel for socket connections. In other words nothing http
>>>>>>>>> specific?
>>>>>>>>>
>>>>>>>>> So I would advice you to start implementing something
>>>>>>>>> called... let's say ProxyProtocolFilter (as Jeanfrancois
>>>>>>>>> proposed) :)
>>>>>>>>> To see how you can implement the Filter - please find
>>>>>>>>> EchoFilter in Grizzly/framework test folder.
>>>>>>>>>
>>>>>>>>> As for proxy-to-tomcat connections - please use
>>>>>>>>> ConnectorHandlers (to get one use
>>>>>>>>> controller.aquireConnectorHandler(), and don't forget to
>>>>>>>>> release it).
>>>>>>>>> We're working on documentations, but you can take a look at
>>>>>>>>> unit tests we have to understand how you can use Grizzly both
>>>>>>>>> for client and server sides.
>>>>>>>>>
>>>>>>>>> If you'll have questions - please ask.
>>>>>>>>>
>>>>>>>> OK, I have a ProxyProtocolFilter and I also found out that the
>>>>>>>> WorkerThread contains the ByteBuffer. So I would do the following
>>>>>>>>
>>>>>>>> Use the returned ByteBuffer from the WorkerThread aquire a
>>>>>>>> TCPConnectorHandler from the Controller in the Context
>>>>>>>>
>>>>>>>> register a CallBackHandler with a reference to the
>>>>>>>> SocketConnection for the way from client to proxy and an
>>>>>>>> implemented onConnect() method
>>>>>>>>
>>>>>>>> write out the data with write(byteBuffer, true)
>>>>>>>>
>>>>>>>> When do I release the TCPConnectorHandler? I store the
>>>>>>>> TCPConnector with the SocketChannel for the way client-proxy in
>>>>>>>> a Map in the Context. So the connection can be reused. What is
>>>>>>>> better?
>>>>>>>>
>>>>>>>> When the connection is done onConnect gets executed and I can
>>>>>>>> get the SocketChannel for the line proxy - Tomcat server
>>>>>>>> The pair of SocketConnections client-proxy and proxy-Tomcat
>>>>>>>> server is kept in a mapping in the context. This is important
>>>>>>>> to remember who is talking over the proxy to whom.
>>>>>>>>
>>>>>>>> For the way server-proxy to proxy-client I have to write back
>>>>>>>> the data. Problem I see no way how I can use here an
>>>>>>>> TCPConnectorHandler. The TCPSelectorHandler offers no
>>>>>>>> possibility to use an existing SocketChannel. I would change
>>>>>>>> this to simplify my life or how is it intended?
>>>>>>>>
>>>>>>>> Why are there two methods: register(CallbackHandler) and
>>>>>>>> setCallbackHandler(...)?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Karsten
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> WBR,
>>>>>>>>> Alexey.
>>>>>>>>>
>>>>>>>>> Karsten Ohme wrote:
>>>>>>>>>
>>>>>>>>>> On Thu, May 31, 2007 at 02:43:43PM -0400, Jeanfrancois Arcand
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Hi Karsten,
>>>>>>>>>>>
>>>>>>>>>>> don't stop the feedback!
>>>>>>>>>>>
>>>>>>>>>>> Karsten Ohme wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I want to program a proxy (actually tunnel). All
>>>>>>>>>>>> connections are made to the proxy, the proxy makes some
>>>>>>>>>>>> decisions and forwards them to the server. For my situation
>>>>>>>>>>>> many clients connect to the proxy on port 80, the proxy
>>>>>>>>>>>> forwards the requests to a Tomcat server on port 8080 and
>>>>>>>>>>>> writes the response back to the client.
>>>>>>>>>>>>
>>>>>>>>>>> Stupid question: can the client connect directly to Tomcat
>>>>>>>>>>> or does all connection must go to the proxy? I suspect all
>>>>>>>>>>> connections have to pass through the proxy, but in case not,
>>>>>>>>>>> the proxy can always send a redirect to the client (using
>>>>>>>>>>> the location: header).
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> How can I acieve this with Grizzly? I though I had to
>>>>>>>>>>>> extend the TCPSelectorHandler, overwrite the
>>>>>>>>>>>> onAcceptInterest method, establish there a new
>>>>>>>>>>>> TCPConnectorHandler for the path client-proxy and one for
>>>>>>>>>>>> proxy-server and register CallbackHandlers wo propagate the
>>>>>>>>>>>> read and write operations.
>>>>>>>>>>>>
>>>>>>>>>>> I would not override the onAcceptInterest but instead let
>>>>>>>>>>> the default workflow happen. On of the reason is
>>>>>>>>>>> onAcceptInterest is executed on the same thread as the
>>>>>>>>>>> Selector.select(), which is a performance bottleneck. I
>>>>>>>>>>> would instead implement two ProtocolFilters that uses the
>>>>>>>>>>> newly added ConnectionPool/Cache. My recommendation is to do:
>>>>>>>>>>>
>>>>>>>>>>> 1. Use the default ReadFilter to execute the first read to
>>>>>>>>>>> make sure the connection hasn't been closed between the
>>>>>>>>>>> accept() and the first read.
>>>>>>>>>>> 2. Add a new ProxyProtocolFilter:
>>>>>>>>>>>
>>>>>>>>>>> + When initializing, create a pool of connection your remote
>>>>>>>>>>> server (Tomcat). I recommend you take a look at the new
>>>>>>>>>>> ConnectionPool/Cache Alexey demonstrated yesterday at the
>>>>>>>>>>> Grizzly meeting.
>>>>>>>>>>>
>>>>>>>>>> I don't know how to use it. What is a ConnectionPool? There
>>>>>>>>>> is only a class ConnectionCache. What is it useful for? From
>>>>>>>>>> the name I suspect that some sort of connections are cached
>>>>>>>>>> and I have a method to get free connections which I can use.
>>>>>>>>>> I would expect a constructor with a given number where this
>>>>>>>>>> amount of connections are created. For me this would be
>>>>>>>>>> always the same type of connection from the proxy to the
>>>>>>>>>> client, for the way
>>>>>>>>> >from client to proxy I need for each client a single
>>>>>>>>> connection or not?
>>>>>>>>>> What are doing requestReceived(), requestProcessed() and
>>>>>>>>>> responseSent( C conn ). Which connection I have to pass in as
>>>>>>>>>> parameter? What means cached? In the sources it looks like i
>>>>>>>>>> have to pass the connectiosn on my own. From which class must
>>>>>>>>>> the methods be called? How can I reuese such a cached
>>>>>>>>>> exception when it is idle? There is not get method.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Karsten
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> + When execute() is invoked, use a StreamAlgorithm
>>>>>>>>>>> implementation to make sure you can read the complete HTTP
>>>>>>>>>>> request (note: you gonna need to buffer the body as well).
>>>>>>>>>>> The module/http sub module have a StreamAlgorithm
>>>>>>>>>>> implementations called
>>>>>>>>>>> ContentLengthAlgorithm|StateMachineAlgoruithm that can be
>>>>>>>>>>> reused. Mainly those classes will make sure the entire
>>>>>>>>>>> headers + body are read.
>>>>>>>>>>> + Once you have read all the bytes, get a connection from
>>>>>>>>>>> the connection pool and flush the bytes to Tomcat. Make the
>>>>>>>>>>> TCPConnectionHandler.write(bb,TRUE) so the temporary
>>>>>>>>>>> Selector trick can be reused.
>>>>>>>>>>> + Then read back the response and flush it back to the client.
>>>>>>>>>>>
>>>>>>>>>>> Would that make sense? You might also want to create two
>>>>>>>>>>> ProtocolFilter, one for reading the request, and one for
>>>>>>>>>>> writing the response to the active connection to Tomcat.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> But I don't know how the ProtocolFilter fits into the
>>>>>>>>>>>> picture. How do I use it? I also want to use TLS
>>>>>>>>>>>> connections. How can I get the data which is decoded?
>>>>>>>>>>>>
>>>>>>>>>>>> Is there any documentation or tutorial for the new Grizzly
>>>>>>>>>>>> API?
>>>>>>>>>>>>
>>>>>>>>>>> Take a look at the following talks:
>>>>>>>>>>>
>>>>>>>>>>> https://grizzly.dev.java.net/presentations/FISL-2007.pdf
>>>>>>>>>>> http://weblogs.java.net/blog/jfarcand/archive/BOF-4989-Grizzly.pdf
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We don't have yet tutorials but I'm working on one right now.
>>>>>>>>>>>
>>>>>>>>>>> Hope that help.
>>>>>>>>>>>
>>>>>>>>>>> -- Jeanfrancois
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Karsten
>>>>>>>>>>>>
>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>>
>>>>>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>>>> For additional commands, e-mail:
>>>>>>>>>>>> users-help_at_grizzly.dev.java.net
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>
>>>>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>>> For additional commands, e-mail:
>>>>>>>>>>> users-help_at_grizzly.dev.java.net
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>
>>>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>
>>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>>>
>>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>
>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>>
>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>
>>>>>>> ------------------------------------------------------------------------
>>>>>>>
>>>>>>>
>>>>>>> /*
>>>>>>> * The contents of this file are subject to the terms
>>>>>>> * of the Common Development and Distribution License
>>>>>>> * (the License). You may not use this file except in
>>>>>>> * compliance with the License.
>>>>>>> *
>>>>>>> * You can obtain a copy of the license at
>>>>>>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>>>>>>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>>>>>>> * See the License for the specific language governing
>>>>>>> * permissions and limitations under the License.
>>>>>>> *
>>>>>>> * When distributing Covered Code, include this CDDL
>>>>>>> * Header Notice in each file and include the License file
>>>>>>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>>>>>>> * If applicable, add the following below the CDDL Header,
>>>>>>> * with the fields enclosed by brackets [] replaced by
>>>>>>> * you own identifying information:
>>>>>>> * "Portions Copyrighted [year] [name of copyright owner]"
>>>>>>> *
>>>>>>> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>>>>>>> */
>>>>>>> package com.sun.grizzly;
>>>>>>>
>>>>>>> import java.io.Closeable;
>>>>>>> import java.io.IOException;
>>>>>>> import java.net.SocketAddress;
>>>>>>> import java.nio.ByteBuffer;
>>>>>>> import java.nio.channels.SelectableChannel;
>>>>>>> import java.nio.channels.SelectionKey;
>>>>>>>
>>>>>>> /**
>>>>>>> * Client side interface used to implement non blocking client
>>>>>>> operation.
>>>>>>> * Implementation of this class must make sure the following
>>>>>>> methods are invoked
>>>>>>> * in that order:
>>>>>>> * * (1) connect() (2) read() or write().
>>>>>>> * * * @param E a <code>SelectorHandler</code>
>>>>>>> * @author Jeanfrancois Arcand
>>>>>>> */
>>>>>>> public interface ConnectorHandler<E extends SelectorHandler, C
>>>>>>> extends SelectableChannel> extends Handler, Closeable {
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * A token decribing the protocol supported by an
>>>>>>> implementation of this
>>>>>>> * interface
>>>>>>> * @return <code>Controller.Protocol</code>
>>>>>>> */
>>>>>>> public Controller.Protocol protocol();
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g OP_READ
>>>>>>> * or OP_WRITE), the <code>Controller</code> will invoke the
>>>>>>> * CallBackHandler.
>>>>>>> * * @param remoteAddress remote address to connect
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when an non
>>>>>>> * blocking operation is ready to be handled.
>>>>>>> * @param e <code>SelectorHandler</code>
>>>>>>> * @throws java.io.IOException */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> CallbackHandler callbackHandler, E e) throws IOException;
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g OP_READ
>>>>>>> * or OP_WRITE), the <code>Controller</code> will invoke the
>>>>>>> * CallBackHandler.
>>>>>>> * * @param remoteAddress remote address to
>>>>>>> connect * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> * @throws java.io.IOException
>>>>>>> */ public void connect(SocketAddress remoteAddress,
>>>>>>> CallbackHandler callbackHandler) throws
>>>>>>> IOException;
>>>>>>> /**
>>>>>>> * Connect to hostname:port. Internally an instance of
>>>>>>> Controller and its
>>>>>>> * default SelectorHandler will be created everytime this
>>>>>>> method is called.
>>>>>>> * This method should be used only and only if no external
>>>>>>> Controller has
>>>>>>> * been initialized.
>>>>>>> * * @param remoteAddress remote address to connect
>>>>>>> * @throws java.io.IOException */
>>>>>>> public void connect(SocketAddress remoteAddress) throws
>>>>>>> IOException;
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g OP_READ
>>>>>>> * or OP_WRITE), the <code>Controller</code> will invoke the
>>>>>>> * CallBackHandler.
>>>>>>> * * @param remoteAddress remote address to
>>>>>>> connect * @param localAddress local address to bind
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled. * @param e <code>SelectorHandler</code>
>>>>>>> * @throws java.io.IOException
>>>>>>> */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> SocketAddress localAddress, CallbackHandler callbackHandler, E
>>>>>>> e) throws IOException;
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g OP_READ
>>>>>>> * or OP_WRITE), the <code>Controller</code> will invoke the
>>>>>>> * CallBackHandler.
>>>>>>> * * @param remoteAddress remote address to connect
>>>>>>> * @param localAddress local address to bind
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> * @throws java.io.IOException */ public void
>>>>>>> connect(SocketAddress remoteAddress, SocketAddress localAddress,
>>>>>>> CallbackHandler callbackHandler) throws
>>>>>>> IOException;
>>>>>>> /**
>>>>>>> * Connect to hostname:port. Internally an instance of
>>>>>>> Controller and its
>>>>>>> * default SelectorHandler will be created everytime this
>>>>>>> method is called.
>>>>>>> * This method should be used only and only if no external
>>>>>>> Controller has
>>>>>>> * been initialized.
>>>>>>> * * @param remoteAddress remote address to connect
>>>>>>> * @param localAddress local address to bind
>>>>>>> * @throws java.io.IOException */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> SocketAddress localAddress) throws IOException;
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * Read bytes. If blocking is set to <tt>true</tt>, a pool
>>>>>>> of temporary
>>>>>>> * <code>Selector</code> will be used to read bytes.
>>>>>>> * * @param byteBuffer The byteBuffer to store bytes.
>>>>>>> * @param blocking <tt>true</tt> if a a pool of temporary
>>>>>>> Selector is
>>>>>>> * required to handle a blocking read.
>>>>>>> * @return number of bytes read
>>>>>>> * @throws java.io.IOException */
>>>>>>> public long read(ByteBuffer byteBuffer, boolean blocking)
>>>>>>> throws IOException;
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * Writes bytes. If blocking is set to <tt>true</tt>, a pool
>>>>>>> of temporary
>>>>>>> * <code>Selector</code> will be used to writes bytes.
>>>>>>> * * @param byteBuffer The byteBuffer to write.
>>>>>>> * @param blocking <tt>true</tt> if a a pool of temporary
>>>>>>> Selector
>>>>>>> * is required to handle a blocking write.
>>>>>>> * @return number of bytes written
>>>>>>> * @throws java.io.IOException */ public long
>>>>>>> write(ByteBuffer byteBuffer, boolean blocking) throws
>>>>>>> IOException; /**
>>>>>>> * Close the underlying connection.
>>>>>>> * * @throws java.io.IOException
>>>>>>> */
>>>>>>> public void close() throws IOException;
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * Decide how the OP_CONNECT final steps are handled.
>>>>>>> * @param key <code>SelectionKey</code>
>>>>>>> */
>>>>>>> public void finishConnect(SelectionKey key);
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * Set the <code>Controller</code> associated with this
>>>>>>> instance.
>>>>>>> * @param controller <code>Controller</code>
>>>>>>> */
>>>>>>> public void setController(Controller controller);
>>>>>>>
>>>>>>>
>>>>>>> /**
>>>>>>> * Return the <code>Controller</code>
>>>>>>> * * @return
>>>>>>> */
>>>>>>> public Controller getController();
>>>>>>>
>>>>>>> /**
>>>>>>> * Method returns <code>SelectorHandler</code>, which
>>>>>>> manages this
>>>>>>> * <code>ConnectorHandler</code>
>>>>>>> * * @return <code>SelectorHandler</code>
>>>>>>> */
>>>>>>> public E getSelectorHandler();
>>>>>>>
>>>>>>> /**
>>>>>>> * Method returns <code>ConnectorHandler</code>'s underlying
>>>>>>> channel
>>>>>>> * * @return channel
>>>>>>> */
>>>>>>> public C getUnderlyingChannel();
>>>>>>>
>>>>>>> /**
>>>>>>> * Returns <code>ConnectorHandler</code>'s callback handler
>>>>>>> instance,
>>>>>>> * which is used to process occuring events
>>>>>>> * * @return callback handler
>>>>>>> */
>>>>>>> public CallbackHandler getCallbackHandler();
>>>>>>>
>>>>>>> /**
>>>>>>> * Sets an existing channel for this ConnectorHandler. This
>>>>>>> can be used to
>>>>>>> * (re)use this ConnectorHandler for an already existing
>>>>>>> connection.
>>>>>>> * <p>
>>>>>>> * The controller which controls the channel must also be set.
>>>>>>> * @param channel The channel to use. * @see
>>>>>>> #setController(Controller).
>>>>>>> */
>>>>>>> public void setUnderlyingChannel(C channel);
>>>>>>>
>>>>>>> /**
>>>>>>> * Sets <code>ConnectorHandler</code>'s callback handler
>>>>>>> instance, which
>>>>>>> * is used to process occuring events
>>>>>>> * * @param callbackHandler handler
>>>>>>> */
>>>>>>> public void setCallbackHandler(CallbackHandler
>>>>>>> callbackHandler);
>>>>>>> }
>>>>>>>
>>>>>>> ------------------------------------------------------------------------
>>>>>>>
>>>>>>>
>>>>>>> /*
>>>>>>> * The contents of this file are subject to the terms
>>>>>>> * of the Common Development and Distribution License
>>>>>>> * (the License). You may not use this file except in
>>>>>>> * compliance with the License.
>>>>>>> *
>>>>>>> * You can obtain a copy of the license at
>>>>>>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>>>>>>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>>>>>>> * See the License for the specific language governing
>>>>>>> * permissions and limitations under the License.
>>>>>>> *
>>>>>>> * When distributing Covered Code, include this CDDL
>>>>>>> * Header Notice in each file and include the License file
>>>>>>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>>>>>>> * If applicable, add the following below the CDDL Header,
>>>>>>> * with the fields enclosed by brackets [] replaced by
>>>>>>> * you own identifying information:
>>>>>>> * "Portions Copyrighted [year] [name of copyright owner]"
>>>>>>> *
>>>>>>> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>>>>>>> */
>>>>>>>
>>>>>>> package com.sun.grizzly;
>>>>>>>
>>>>>>> import com.sun.grizzly.Controller;
>>>>>>> import com.sun.grizzly.util.ByteBufferInputStream;
>>>>>>> import java.io.IOException;
>>>>>>> import java.net.SocketAddress;
>>>>>>> import java.nio.ByteBuffer;
>>>>>>> import java.nio.channels.AlreadyConnectedException;
>>>>>>> import java.nio.channels.NotYetConnectedException;
>>>>>>> import java.nio.channels.SelectionKey;
>>>>>>> import java.nio.channels.DatagramChannel;
>>>>>>> import java.util.concurrent.CountDownLatch;
>>>>>>> import java.util.concurrent.TimeUnit;
>>>>>>>
>>>>>>> /**
>>>>>>> * Client side interface used to implement non blocking client
>>>>>>> operation. * Implementation of this class must make sure the
>>>>>>> following methods are * invoked in that order:
>>>>>>> * * (1) connect()
>>>>>>> * (2) read() or write().
>>>>>>> * * @author Jeanfrancois Arcand
>>>>>>> */
>>>>>>> public class UDPConnectorHandler implements
>>>>>>> ConnectorHandler<UDPSelectorHandler, DatagramChannel>{
>>>>>>> /**
>>>>>>> * The underlying UDPSelectorHandler used to mange
>>>>>>> SelectionKeys.
>>>>>>> */
>>>>>>> private UDPSelectorHandler selectorHandler;
>>>>>>> /**
>>>>>>> * A <code>CallbackHandler</code> handler invoked by the
>>>>>>> UDPSelectorHandler
>>>>>>> * when a non blocking operation is ready to be processed.
>>>>>>> */
>>>>>>> private CallbackHandler callbackHandler;
>>>>>>>
>>>>>>> /**
>>>>>>> * The connection's DatagramChannel.
>>>>>>> */
>>>>>>> private DatagramChannel datagramChannel;
>>>>>>> /**
>>>>>>> * Is the connection established.
>>>>>>> */
>>>>>>> private volatile boolean isConnected;
>>>>>>> /**
>>>>>>> * The internal Controller used (in case not specified).
>>>>>>> */
>>>>>>> private Controller controller;
>>>>>>>
>>>>>>> /**
>>>>>>> * IsConnected Latch related
>>>>>>> */
>>>>>>> private CountDownLatch isConnectedLatch; /**
>>>>>>> * Are we creating a controller every run.
>>>>>>> */
>>>>>>> private boolean isStandalone = false;
>>>>>>> /**
>>>>>>> * A blocking <code>InputStream</code> that use a pool of
>>>>>>> Selector
>>>>>>> * to execute a blocking read operation.
>>>>>>> */
>>>>>>> private ByteBufferInputStream inputStream; /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g * OP_READ or OP_WRITE), the
>>>>>>> <code>Controller</code> will invoke * the CallBackHandler.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> CallbackHandler callbackHandler) throws
>>>>>>> IOException {
>>>>>>> connect(remoteAddress,null,callbackHandler);
>>>>>>> } /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g * OP_READ or OP_WRITE), the
>>>>>>> <code>Controller</code> will invoke * the CallBackHandler.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @param localAddress local address to bind
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> SocketAddress localAddress,
>>>>>>> CallbackHandler callbackHandler) throws
>>>>>>> IOException { if (controller == null){
>>>>>>> throw new IllegalStateException("Controller cannot
>>>>>>> be null");
>>>>>>> }
>>>>>>>
>>>>>>> connect(remoteAddress,localAddress,callbackHandler,
>>>>>>>
>>>>>>> (UDPSelectorHandler)controller.getSelectorHandler(protocol()));
>>>>>>> }
>>>>>>> /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g * OP_READ or OP_WRITE), the
>>>>>>> <code>Controller</code> will invoke * the CallBackHandler.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> * @param selectorHandler an instance of SelectorHandler.
>>>>>>> */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> CallbackHandler callbackHandler,
>>>>>>> UDPSelectorHandler selectorHandler)
>>>>>>> throws IOException {
>>>>>>> connect(remoteAddress,null,callbackHandler,selectorHandler);
>>>>>>> } /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g * OP_READ or OP_WRITE), the
>>>>>>> <code>Controller</code> will invoke * the CallBackHandler.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @param localAddress local address to bin
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> * @param selectorHandler an instance of SelectorHandler.
>>>>>>> */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> SocketAddress localAddress,
>>>>>>> CallbackHandler callbackHandler,
>>>>>>> UDPSelectorHandler selectorHandler)
>>>>>>> throws IOException {
>>>>>>> if (isConnected){
>>>>>>> throw new AlreadyConnectedException();
>>>>>>> }
>>>>>>> if (controller == null){
>>>>>>> throw new IllegalStateException("Controller cannot
>>>>>>> be null");
>>>>>>> }
>>>>>>> if (selectorHandler == null){
>>>>>>> throw new IllegalStateException("Controller cannot
>>>>>>> be null");
>>>>>>> }
>>>>>>> this.selectorHandler = selectorHandler;
>>>>>>> this.callbackHandler = callbackHandler;
>>>>>>> // Wait for the onConnect to be invoked.
>>>>>>> isConnectedLatch = new
>>>>>>> CountDownLatch(1);
>>>>>>> selectorHandler.connect(remoteAddress,localAddress,callbackHandler);
>>>>>>>
>>>>>>> inputStream = new ByteBufferInputStream();
>>>>>>> try {
>>>>>>> isConnectedLatch.await(30, TimeUnit.SECONDS);
>>>>>>> } catch (InterruptedException ex) {
>>>>>>> throw new IOException(ex.getMessage());
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> /**
>>>>>>> * Connect to hostname:port. Internally an instance of
>>>>>>> Controller and
>>>>>>> * its default SelectorHandler will be created everytime
>>>>>>> this method is * called. This method should be used only
>>>>>>> and only if no external * Controller has been initialized.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> */
>>>>>>> public void connect(SocketAddress remoteAddress)
>>>>>>> throws IOException {
>>>>>>> connect(remoteAddress,(SocketAddress)null); }
>>>>>>> /**
>>>>>>> * Connect to hostname:port. Internally an instance of
>>>>>>> Controller and
>>>>>>> * its default SelectorHandler will be created everytime
>>>>>>> this method is * called. This method should be used only
>>>>>>> and only if no external * Controller has been initialized.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @param localAddress local address to bin
>>>>>>> */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> SocketAddress localAddress) throws IOException {
>>>>>>> if (isConnected){
>>>>>>> throw new AlreadyConnectedException();
>>>>>>> }
>>>>>>> if (controller == null){
>>>>>>> isStandalone = true;
>>>>>>> controller = new Controller();
>>>>>>> controller.setSelectorHandler(new
>>>>>>> UDPSelectorHandler(true));
>>>>>>> DefaultPipeline pipeline = new DefaultPipeline();
>>>>>>> pipeline.initPipeline();
>>>>>>> pipeline.startPipeline();
>>>>>>> controller.setPipeline(pipeline);
>>>>>>>
>>>>>>> callbackHandler = new CallbackHandler<Context>(){
>>>>>>> public void onConnect(IOEvent<Context> ioEvent) {
>>>>>>> SelectionKey key =
>>>>>>> ioEvent.attachment().getSelectionKey();
>>>>>>> finishConnect(key);
>>>>>>> }
>>>>>>> public void onRead(IOEvent<Context> ioEvent) {
>>>>>>> }
>>>>>>> public void onWrite(IOEvent<Context> ioEvent) {
>>>>>>> }
>>>>>>> };
>>>>>>>
>>>>>>> final CountDownLatch latch = new CountDownLatch(1);
>>>>>>> try{
>>>>>>> pipeline.execute(new Context(){
>>>>>>> public Object call() throws Exception {
>>>>>>> latch.countDown();
>>>>>>> controller.start();
>>>>>>> return null;
>>>>>>> } });
>>>>>>> } catch (PipelineFullException ex){
>>>>>>> throw new IOException(ex.getMessage());
>>>>>>> }
>>>>>>> try {
>>>>>>> latch.await(); Thread.sleep(1000);
>>>>>>> } catch (InterruptedException ex) {
>>>>>>> }
>>>>>>> }
>>>>>>> connect(remoteAddress,localAddress,callbackHandler,
>>>>>>>
>>>>>>> (UDPSelectorHandler)controller.getSelectorHandler(protocol()));
>>>>>>> }
>>>>>>> /**
>>>>>>> * Read bytes. If blocking is set to <tt>true</tt>, a pool
>>>>>>> of temporary
>>>>>>> * <code>Selector</code> will be used to read bytes.
>>>>>>> * @param byteBuffer The byteBuffer to store bytes.
>>>>>>> * @param blocking <tt>true</tt> if a a pool of temporary
>>>>>>> Selector
>>>>>>> * is required to handle a blocking read.
>>>>>>> */
>>>>>>> public long read(ByteBuffer byteBuffer, boolean blocking)
>>>>>>> throws IOException {
>>>>>>> if (!isConnected){
>>>>>>> throw new NotYetConnectedException();
>>>>>>> }
>>>>>>> SelectionKey key =
>>>>>>> datagramChannel.keyFor(selectorHandler.getSelector());
>>>>>>> if (blocking){
>>>>>>> inputStream.setSelectionKey(key);
>>>>>>> inputStream.setChannelType(
>>>>>>>
>>>>>>> ByteBufferInputStream.ChannelType.DatagramChannel);
>>>>>>> int nRead = inputStream.read(byteBuffer);
>>>>>>> return nRead;
>>>>>>> } else {
>>>>>>> if (callbackHandler == null){
>>>>>>> throw new IllegalStateException
>>>>>>> ("Non blocking read needs a
>>>>>>> CallbackHandler");
>>>>>>> }
>>>>>>> int nRead = datagramChannel.read(byteBuffer);
>>>>>>>
>>>>>>> if (nRead == 0){
>>>>>>> key.attach(callbackHandler);
>>>>>>> selectorHandler.register(key,
>>>>>>>
>>>>>>> SelectionKey.OP_READ|SelectionKey.OP_WRITE);
>>>>>>> }
>>>>>>> return nRead;
>>>>>>> }
>>>>>>> }
>>>>>>> /**
>>>>>>> * Writes bytes. If blocking is set to <tt>true</tt>, a pool
>>>>>>> of temporary
>>>>>>> * <code>Selector</code> will be used to writes bytes.
>>>>>>> * @param byteBuffer The byteBuffer to write.
>>>>>>> * @param blocking <tt>true</tt> if a a pool of temporary
>>>>>>> Selector
>>>>>>> * is required to handle a blocking write.
>>>>>>> */ public long write(ByteBuffer byteBuffer,
>>>>>>> boolean blocking) throws IOException {
>>>>>>> if (!isConnected){
>>>>>>> throw new NotYetConnectedException();
>>>>>>> }
>>>>>>> SelectionKey key =
>>>>>>> datagramChannel.keyFor(selectorHandler.getSelector());
>>>>>>> if (blocking){
>>>>>>> throw new IllegalStateException("Blocking mode not
>>>>>>> supported");
>>>>>>> } else {
>>>>>>> if (callbackHandler == null){
>>>>>>> throw new IllegalStateException
>>>>>>> ("Non blocking write needs a
>>>>>>> CallbackHandler");
>>>>>>> }
>>>>>>> int nWrite = datagramChannel.write(byteBuffer);
>>>>>>> if (nWrite == 0){
>>>>>>> key.attach(callbackHandler);
>>>>>>> selectorHandler.register(key,
>>>>>>>
>>>>>>> SelectionKey.OP_READ|SelectionKey.OP_WRITE);
>>>>>>> } return nWrite;
>>>>>>> }
>>>>>>> }
>>>>>>> /**
>>>>>>> * Receive bytes.
>>>>>>> * @param byteBuffer The byteBuffer to store bytes.
>>>>>>> * @param socketAddress * @return number bytes sent
>>>>>>> * @throws java.io.IOException */
>>>>>>> public long send(ByteBuffer byteBuffer, SocketAddress
>>>>>>> socketAddress) throws IOException {
>>>>>>> if (!isConnected){
>>>>>>> throw new NotYetConnectedException();
>>>>>>> }
>>>>>>> if (callbackHandler == null){
>>>>>>> throw new IllegalStateException
>>>>>>> ("Non blocking read needs a CallbackHandler");
>>>>>>> }
>>>>>>>
>>>>>>> return datagramChannel.send(byteBuffer,socketAddress);
>>>>>>> }
>>>>>>> /**
>>>>>>> * Receive bytes.
>>>>>>> * @param byteBuffer The byteBuffer to store bytes.
>>>>>>> * @return <code>SocketAddress</code>
>>>>>>> * @throws java.io.IOException */
>>>>>>> public SocketAddress receive(ByteBuffer byteBuffer) throws
>>>>>>> IOException {
>>>>>>> if (!isConnected){
>>>>>>> throw new NotYetConnectedException();
>>>>>>> }
>>>>>>> SelectionKey key =
>>>>>>> datagramChannel.keyFor(selectorHandler.getSelector());
>>>>>>> if (callbackHandler == null){
>>>>>>> throw new IllegalStateException
>>>>>>> ("Non blocking read needs a CallbackHandler");
>>>>>>> }
>>>>>>> SocketAddress socketAddress =
>>>>>>> datagramChannel.receive(byteBuffer);
>>>>>>> return socketAddress;
>>>>>>> }
>>>>>>> /**
>>>>>>> * Close the underlying connection.
>>>>>>> */
>>>>>>> public void close() throws IOException{
>>>>>>> if (datagramChannel != null){
>>>>>>> if (selectorHandler != null){
>>>>>>> SelectionKey key =
>>>>>>> datagramChannel.keyFor(selectorHandler.getSelector());
>>>>>>> if (key == null) return;
>>>>>>> key.cancel();
>>>>>>> key.attach(null);
>>>>>>> }
>>>>>>> datagramChannel.close(); }
>>>>>>> if (controller != null && isStandalone){
>>>>>>> controller.stop();
>>>>>>> controller = null;
>>>>>>> } isStandalone = false;
>>>>>>> }
>>>>>>>
>>>>>>> /**
>>>>>>> * Finish handling the OP_CONNECT interest ops.
>>>>>>> */
>>>>>>> public void finishConnect(SelectionKey key){
>>>>>>> datagramChannel =
>>>>>>> (DatagramChannel)key.channel(); isConnected =
>>>>>>> datagramChannel.isConnected();
>>>>>>> isConnectedLatch.countDown();
>>>>>>> }
>>>>>>> /**
>>>>>>> * A token decribing the protocol supported by an
>>>>>>> implementation of this
>>>>>>> * interface
>>>>>>> */
>>>>>>> public Controller.Protocol protocol(){
>>>>>>> return Controller.Protocol.UDP;
>>>>>>> }
>>>>>>> /**
>>>>>>> * Is the underlying DatagramChannel connected.
>>>>>>> * @return true if connected, othewise false
>>>>>>> */
>>>>>>> public boolean isConnected(){
>>>>>>> return isConnected;
>>>>>>> } public Controller getController() {
>>>>>>> return controller;
>>>>>>> }
>>>>>>>
>>>>>>> public void setController(Controller controller) {
>>>>>>> this.controller = controller;
>>>>>>> }
>>>>>>>
>>>>>>> public DatagramChannel getUnderlyingChannel() {
>>>>>>> return datagramChannel;
>>>>>>> }
>>>>>>>
>>>>>>> public CallbackHandler getCallbackHandler() {
>>>>>>> return callbackHandler;
>>>>>>> }
>>>>>>>
>>>>>>> public void setCallbackHandler(CallbackHandler
>>>>>>> callbackHandler) {
>>>>>>> this.callbackHandler = callbackHandler;
>>>>>>> }
>>>>>>>
>>>>>>> public UDPSelectorHandler getSelectorHandler() {
>>>>>>> return selectorHandler;
>>>>>>> }
>>>>>>>
>>>>>>> public void setChannel(DatagramChannel channel,
>>>>>>> UDPSelectorHandler selectorHandler) {
>>>>>>> this.selectorHandler = selectorHandler;
>>>>>>> this.datagramChannel = channel;
>>>>>>> }
>>>>>>>
>>>>>>> public void setUnderlyingChannel(DatagramChannel channel) {
>>>>>>> this.datagramChannel = channel;
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> ------------------------------------------------------------------------
>>>>>>>
>>>>>>>
>>>>>>> /*
>>>>>>> * The contents of this file are subject to the terms
>>>>>>> * of the Common Development and Distribution License
>>>>>>> * (the License). You may not use this file except in
>>>>>>> * compliance with the License.
>>>>>>> *
>>>>>>> * You can obtain a copy of the license at
>>>>>>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>>>>>>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>>>>>>> * See the License for the specific language governing
>>>>>>> * permissions and limitations under the License.
>>>>>>> *
>>>>>>> * When distributing Covered Code, include this CDDL
>>>>>>> * Header Notice in each file and include the License file
>>>>>>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>>>>>>> * If applicable, add the following below the CDDL Header,
>>>>>>> * with the fields enclosed by brackets [] replaced by
>>>>>>> * you own identifying information:
>>>>>>> * "Portions Copyrighted [year] [name of copyright owner]"
>>>>>>> *
>>>>>>> * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>>>>>>> */
>>>>>>> package com.sun.grizzly;
>>>>>>>
>>>>>>> import com.sun.grizzly.Controller.Protocol;
>>>>>>> import com.sun.grizzly.util.ByteBufferInputStream;
>>>>>>> import com.sun.grizzly.util.OutputWriter;
>>>>>>> import java.io.IOException;
>>>>>>> import java.net.SocketAddress;
>>>>>>> import java.nio.ByteBuffer;
>>>>>>> import java.nio.channels.AlreadyConnectedException;
>>>>>>> import java.nio.channels.NotYetConnectedException;
>>>>>>> import java.nio.channels.SelectableChannel;
>>>>>>> import java.nio.channels.SelectionKey;
>>>>>>> import java.nio.channels.SocketChannel;
>>>>>>> import java.util.concurrent.CountDownLatch;
>>>>>>> import java.util.concurrent.TimeUnit;
>>>>>>>
>>>>>>> /**
>>>>>>> * Non blocking TCP Connector Handler. The recommended way to
>>>>>>> use this class
>>>>>>> * is by creating an external Controller and share the same
>>>>>>> SelectorHandler
>>>>>>> * instance.
>>>>>>> *
>>>>>>> * Recommended
>>>>>>> * -----------
>>>>>>> * Controller controller = new Controller();
>>>>>>> * // new TCPSelectorHandler(true) means the Selector will be
>>>>>>> used only
>>>>>>> * // for client operation (OP_READ, OP_WRITE, OP_CONNECT).
>>>>>>> * TCPSelectorHandler tcpSelectorHandler = new
>>>>>>> TCPSelectorHandler(true); *
>>>>>>> controller.setSelectorHandler(tcpSelectorHandler);
>>>>>>> * TCPConnectorHandler tcpConnectorHandler = new
>>>>>>> TCPConnectorHandler();
>>>>>>> * tcpConnectorHandler.connect(localhost,port, new
>>>>>>> CallbackHandler(){...},
>>>>>>> * tcpSelectorHandler);
>>>>>>> * TCPConnectorHandler tcpConnectorHandler2 = new
>>>>>>> TCPConnectorHandler();
>>>>>>> * tcpConnectorHandler2.connect(localhost,port, new
>>>>>>> CallbackHandler(){...},
>>>>>>> * tcpSelectorHandler);
>>>>>>> *
>>>>>>> * Not recommended (but still works)
>>>>>>> * ---------------------------------
>>>>>>> * TCPConnectorHandler tcpConnectorHandler = new
>>>>>>> TCPConnectorHandler();
>>>>>>> * tcpConnectorHandler.connect(localhost,port);
>>>>>>> *
>>>>>>> * Internally, an new Controller will be created everytime
>>>>>>> connect(localhost,port)
>>>>>>> * is invoked, which has an impact on performance.
>>>>>>> *
>>>>>>> * @author Jeanfrancois Arcand
>>>>>>> */
>>>>>>> public class TCPConnectorHandler implements
>>>>>>> ConnectorHandler<TCPSelectorHandler, SocketChannel>{
>>>>>>> /**
>>>>>>> * The underlying TCPSelectorHandler used to mange
>>>>>>> SelectionKeys.
>>>>>>> */
>>>>>>> private TCPSelectorHandler selectorHandler;
>>>>>>> /**
>>>>>>> * A <code>CallbackHandler</code> handler invoked by the
>>>>>>> TCPSelectorHandler
>>>>>>> * when a non blocking operation is ready to be processed.
>>>>>>> */
>>>>>>> private CallbackHandler callbackHandler;
>>>>>>> /**
>>>>>>> * A blocking <code>InputStream</code> that use a pool of
>>>>>>> Selector
>>>>>>> * to execute a blocking read operation.
>>>>>>> */
>>>>>>> private ByteBufferInputStream inputStream;
>>>>>>> /**
>>>>>>> * The connection's SocketChannel.
>>>>>>> */
>>>>>>> private SocketChannel socketChannel;
>>>>>>> /**
>>>>>>> * Is the connection established.
>>>>>>> */
>>>>>>> private volatile boolean isConnected;
>>>>>>> /**
>>>>>>> * The internal Controller used (in case not specified).
>>>>>>> */
>>>>>>> private Controller controller;
>>>>>>>
>>>>>>> /**
>>>>>>> * IsConnected Latch related
>>>>>>> */
>>>>>>> private CountDownLatch isConnectedLatch; /**
>>>>>>> * Are we creating a controller every run.
>>>>>>> */
>>>>>>> private boolean isStandalone = false;
>>>>>>> /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g * OP_READ or OP_WRITE), the
>>>>>>> <code>Controller</code> will invoke * the CallBackHandler.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> * @throws java.io.IOException */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> CallbackHandler callbackHandler) throws
>>>>>>> IOException {
>>>>>>> connect(remoteAddress,null,callbackHandler);
>>>>>>> } /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g * OP_READ or OP_WRITE), the
>>>>>>> <code>Controller</code> will invoke * the CallBackHandler.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @param localAddress local address to bind
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> * @throws java.io.IOException */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> SocketAddress localAddress,
>>>>>>> CallbackHandler callbackHandler) throws
>>>>>>> IOException { if (controller == null){
>>>>>>> throw new IllegalStateException("Controller cannot
>>>>>>> be null");
>>>>>>> }
>>>>>>>
>>>>>>> connect(remoteAddress,localAddress,callbackHandler,
>>>>>>>
>>>>>>> (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
>>>>>>> }
>>>>>>> /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g * OP_READ or OP_WRITE), the
>>>>>>> <code>Controller</code> will invoke * the CallBackHandler.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> * @param selectorHandler an instance of SelectorHandler.
>>>>>>> * @throws java.io.IOException */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> CallbackHandler callbackHandler,
>>>>>>> TCPSelectorHandler selectorHandler)
>>>>>>> throws IOException {
>>>>>>> connect(remoteAddress,null,callbackHandler,selectorHandler);
>>>>>>> } /**
>>>>>>> * Connect to hostname:port. When an aysnchronous event
>>>>>>> happens (e.g * OP_READ or OP_WRITE), the
>>>>>>> <code>Controller</code> will invoke * the CallBackHandler.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @param localAddress local address to bin
>>>>>>> * @param callbackHandler the handler invoked by the
>>>>>>> Controller when * an non blocking operation is ready
>>>>>>> to be handled.
>>>>>>> * @param selectorHandler an instance of SelectorHandler.
>>>>>>> * @throws java.io.IOException */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> SocketAddress localAddress,
>>>>>>> CallbackHandler callbackHandler,
>>>>>>> TCPSelectorHandler selectorHandler)
>>>>>>> throws IOException {
>>>>>>> if (isConnected){
>>>>>>> throw new AlreadyConnectedException();
>>>>>>> }
>>>>>>> if (controller == null){
>>>>>>> throw new IllegalStateException("Controller cannot
>>>>>>> be null");
>>>>>>> }
>>>>>>> if (selectorHandler == null){
>>>>>>> throw new IllegalStateException("Controller cannot
>>>>>>> be null");
>>>>>>> }
>>>>>>> this.selectorHandler = selectorHandler;
>>>>>>> this.callbackHandler = callbackHandler;
>>>>>>> // Wait for the onConnect to be invoked.
>>>>>>> isConnectedLatch = new
>>>>>>> CountDownLatch(1);
>>>>>>> selectorHandler.connect(remoteAddress,localAddress,callbackHandler);
>>>>>>>
>>>>>>> inputStream = new ByteBufferInputStream();
>>>>>>>
>>>>>>> try {
>>>>>>> isConnectedLatch.await(30, TimeUnit.SECONDS);
>>>>>>> } catch (InterruptedException ex) {
>>>>>>> throw new IOException(ex.getMessage());
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> /**
>>>>>>> * Connect to hostname:port. Internally an instance of
>>>>>>> Controller and
>>>>>>> * its default SelectorHandler will be created everytime
>>>>>>> this method is * called. This method should be used only
>>>>>>> and only if no external * Controller has been initialized.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @throws java.io.IOException */
>>>>>>> public void connect(SocketAddress remoteAddress)
>>>>>>> throws IOException {
>>>>>>> connect(remoteAddress,(SocketAddress)null); }
>>>>>>> /**
>>>>>>> * Connect to hostname:port. Internally an instance of
>>>>>>> Controller and
>>>>>>> * its default SelectorHandler will be created everytime
>>>>>>> this method is * called. This method should be used only
>>>>>>> and only if no external * Controller has been initialized.
>>>>>>> * @param remoteAddress remote address to connect
>>>>>>> * @throws java.io.IOException * @param localAddress
>>>>>>> local address to bin
>>>>>>> */
>>>>>>> public void connect(SocketAddress remoteAddress,
>>>>>>> SocketAddress localAddress) throws IOException {
>>>>>>> if (isConnected){
>>>>>>> throw new AlreadyConnectedException();
>>>>>>> }
>>>>>>> if (controller == null){
>>>>>>> isStandalone = true;
>>>>>>> controller = new Controller();
>>>>>>> controller.setSelectorHandler(new
>>>>>>> TCPSelectorHandler(true));
>>>>>>> DefaultPipeline pipeline = new DefaultPipeline();
>>>>>>> pipeline.initPipeline();
>>>>>>> pipeline.startPipeline();
>>>>>>> controller.setPipeline(pipeline);
>>>>>>>
>>>>>>> callbackHandler = new CallbackHandler<Context>(){
>>>>>>> public void onConnect(IOEvent<Context> ioEvent) {
>>>>>>> SelectionKey key =
>>>>>>> ioEvent.attachment().getSelectionKey();
>>>>>>> socketChannel = (SocketChannel)key.channel();
>>>>>>> finishConnect(key);
>>>>>>>
>>>>>>> getController().registerKey(key,SelectionKey.OP_WRITE,
>>>>>>> Protocol.TCP);
>>>>>>> }
>>>>>>> public void onRead(IOEvent<Context> ioEvent) {
>>>>>>> }
>>>>>>> public void onWrite(IOEvent<Context> ioEvent) {
>>>>>>> }
>>>>>>> };
>>>>>>>
>>>>>>> final CountDownLatch latch = new CountDownLatch(1);
>>>>>>> try{
>>>>>>> pipeline.execute(new Context(){
>>>>>>> public Object call() throws Exception {
>>>>>>> latch.countDown();
>>>>>>> controller.start();
>>>>>>> return null;
>>>>>>> } });
>>>>>>> } catch (PipelineFullException ex){
>>>>>>> throw new IOException(ex.getMessage());
>>>>>>> }
>>>>>>> try {
>>>>>>> latch.await(); Thread.sleep(1000);
>>>>>>> } catch (InterruptedException ex) {
>>>>>>> }
>>>>>>> }
>>>>>>> connect(remoteAddress,localAddress,callbackHandler,
>>>>>>>
>>>>>>> (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
>>>>>>> }
>>>>>>> /**
>>>>>>> * Read bytes. If blocking is set to <tt>true</tt>, a pool
>>>>>>> of temporary
>>>>>>> * <code>Selector</code> will be used to read bytes.
>>>>>>> * @param byteBuffer The byteBuffer to store bytes.
>>>>>>> * @param blocking <tt>true</tt> if a a pool of temporary
>>>>>>> Selector
>>>>>>> * is required to handle a blocking read.
>>>>>>> * @return number of bytes read
>>>>>>> * @throws java.io.IOException */
>>>>>>> public long read(ByteBuffer byteBuffer, boolean blocking)
>>>>>>> throws IOException {
>>>>>>> if (!isConnected){
>>>>>>> throw new NotYetConnectedException();
>>>>>>> }
>>>>>>> SelectionKey key =
>>>>>>> socketChannel.keyFor(selectorHandler.getSelector());
>>>>>>> if (blocking){
>>>>>>> inputStream.setSelectionKey(key);
>>>>>>> return inputStream.read(byteBuffer);
>>>>>>> } else {
>>>>>>> if (callbackHandler == null){
>>>>>>> throw new IllegalStateException
>>>>>>> ("Non blocking read needs a
>>>>>>> CallbackHandler");
>>>>>>> }
>>>>>>> int nRead = socketChannel.read(byteBuffer);
>>>>>>>
>>>>>>> if (nRead == 0){
>>>>>>> key.attach(callbackHandler);
>>>>>>> selectorHandler.register(key,SelectionKey.OP_READ);
>>>>>>> }
>>>>>>> return nRead;
>>>>>>> }
>>>>>>> }
>>>>>>> /**
>>>>>>> * Writes bytes. If blocking is set to <tt>true</tt>, a pool
>>>>>>> of temporary
>>>>>>> * <code>Selector</code> will be used to writes bytes.
>>>>>>> * @param byteBuffer The byteBuffer to write.
>>>>>>> * @param blocking <tt>true</tt> if a a pool of temporary
>>>>>>> Selector
>>>>>>> * is required to handle a blocking write.
>>>>>>> * @return number of bytes written
>>>>>>> * @throws java.io.IOException */ public
>>>>>>> long write(ByteBuffer byteBuffer, boolean blocking) throws
>>>>>>> IOException {
>>>>>>> if (!isConnected){
>>>>>>> throw new NotYetConnectedException();
>>>>>>> }
>>>>>>> SelectionKey key =
>>>>>>> socketChannel.keyFor(selectorHandler.getSelector());
>>>>>>> if (blocking){
>>>>>>> return
>>>>>>> OutputWriter.flushChannel(socketChannel,byteBuffer);
>>>>>>> } else {
>>>>>>> if (callbackHandler == null){
>>>>>>> throw new IllegalStateException
>>>>>>> ("Non blocking write needs a
>>>>>>> CallbackHandler");
>>>>>>> }
>>>>>>> int nWrite = 1;
>>>>>>> while (nWrite > 0 && byteBuffer.hasRemaining()){
>>>>>>> nWrite = socketChannel.write(byteBuffer);
>>>>>>> }
>>>>>>> if (nWrite == 0 &&
>>>>>>> byteBuffer.hasRemaining()){
>>>>>>> key.attach(callbackHandler);
>>>>>>>
>>>>>>> selectorHandler.register(key,SelectionKey.OP_WRITE);
>>>>>>> } return nWrite;
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> /**
>>>>>>> * Close the underlying connection.
>>>>>>> */
>>>>>>> public void close() throws IOException{
>>>>>>> if (socketChannel != null){
>>>>>>> if (selectorHandler != null){
>>>>>>> SelectionKey key =
>>>>>>> socketChannel.keyFor(selectorHandler.getSelector());
>>>>>>> if (key == null) return;
>>>>>>> key.cancel();
>>>>>>> key.attach(null);
>>>>>>> }
>>>>>>> socketChannel.close(); }
>>>>>>> if (controller != null && isStandalone){
>>>>>>> controller.stop();
>>>>>>> controller = null;
>>>>>>> } isStandalone = false;
>>>>>>> }
>>>>>>>
>>>>>>> /**
>>>>>>> * Finish handling the OP_CONNECT interest ops.
>>>>>>> * @param key - a <code>SelectionKey</code>
>>>>>>> */
>>>>>>> public void finishConnect(SelectionKey key){
>>>>>>> try{
>>>>>>> socketChannel = (SocketChannel)key.channel();
>>>>>>>
>>>>>>> socketChannel.finishConnect();
>>>>>>> isConnected = socketChannel.isConnected();
>>>>>>> } catch (IOException ex){
>>>>>>> // XXX LOG ME
>>>>>>> ex.printStackTrace();
>>>>>>> } finally {
>>>>>>> isConnectedLatch.countDown();
>>>>>>> }
>>>>>>> }
>>>>>>> public void setUnderlyingChannel(SocketChannel
>>>>>>> socketChannel) {
>>>>>>> this.socketChannel = socketChannel;
>>>>>>> }
>>>>>>> /**
>>>>>>> * A token decribing the protocol supported by an
>>>>>>> implementation of this
>>>>>>> * interface
>>>>>>> * @return this <code>ConnectorHandler</code>'s protocol
>>>>>>> */
>>>>>>> public Controller.Protocol protocol(){
>>>>>>> return Controller.Protocol.TCP;
>>>>>>> }
>>>>>>> /**
>>>>>>> * Is the underlying SocketChannel connected.
>>>>>>> * @return <tt>true</tt> if connected, otherwise <tt>false</tt>
>>>>>>> */
>>>>>>> public boolean isConnected(){
>>>>>>> return isConnected;
>>>>>>> } public Controller getController() {
>>>>>>> return controller;
>>>>>>> }
>>>>>>>
>>>>>>> public void setController(Controller controller) {
>>>>>>> this.controller = controller;
>>>>>>> }
>>>>>>>
>>>>>>> public SocketChannel getUnderlyingChannel() {
>>>>>>> return socketChannel;
>>>>>>> }
>>>>>>>
>>>>>>> public CallbackHandler getCallbackHandler() {
>>>>>>> return callbackHandler;
>>>>>>> }
>>>>>>>
>>>>>>> public void setCallbackHandler(CallbackHandler
>>>>>>> callbackHandler) {
>>>>>>> this.callbackHandler = callbackHandler;
>>>>>>> }
>>>>>>>
>>>>>>> public TCPSelectorHandler getSelectorHandler() {
>>>>>>> return selectorHandler;
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------------------------------------------------
>>>>>>>
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>>
>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>