users@grizzly.java.net

Re: How to implement the client's message flow based on async read/write?

From: JianXing Yi <jianxing.yi_at_gmail.com>
Date: Wed, 28 May 2008 10:17:06 +0800

Many thanks for your time and help!

2008/5/28, Jeanfrancois Arcand <Jeanfrancois.Arcand_at_sun.com>:
> Salut,
>
> JianXing Yi wrote:
>> Hi there,
>>
>> Undoutly, Grizzly is a good NIO framework.
>
> Thanks!
>
> For practice purpose, I'm
>> writing a NIO client sending a HTTP request and receiving its response
>> based on Grizzly.
>
> Interesting...
>
>>
>> 1. Performing connecting to server in the ConnectorHandler's
>> onConnect() method, at the same time, for writing HTTP GET request
>> purpose, the OP_WRITE interest registered in this method. I think this
>> registration will be resulted in triggering the ConnectorHandler's
>> onWrite() method.
>
> Only if there is some write ops ready.
>
>> 2. Performing writing in the ConnectorHandler's onWrite() method,
>> through the asynchronous way like this:
>> @Override
>> public void onWrite(IOEvent<Context> ioEvent) {
>> System.out.println("onWrite()");
>> try {
>> connectorHandler.writeToAsyncQueue(ByteBuffer
>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>> .getBytes()), asyncWriteHandler);
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>> 3. Registering OP_READ interest in the AsyncWriteCallbackHandler's
>> onWriteCompleted() method. By doing this, I think the
>> ConnectorHandler's onRead() method will be triggered, so I can read
>> the HTTP response. If reading the response in blocking way, then there
>> is no problems.
>> MY PROBLEM IS: IF I USE ConnectorHandler(in fact a
>> TcpConnectorHandler).readFromAsyncQueue() METHOD TO READ THE RESPONSE,
>> THEN I CANNOT GET THE RESPONSE CORRECTLY. WHAT'S MORE, THE
>> AsyncReadCallbackHandler's onReadCompleted() METHOD NEVER BEING
>> CALLED!
>>
>> Below is the testing client code. Thanks for your help!
>
> Where are you registering's back the OP_READ? From the code below (I
> haven't yet had a chance to run it...in an airport right now :-)), the
> registration seems to be commented out.
I register back the OP_READ interest in the AsyncWriteCallbackHandler's onWriteCompleted() method but it seems not work, neither the commented out registeration in the Connector's CallbackHandler onWrite() method.

>
> Have you try to register for OP_READ just after connect? Since HTTP is a
> request->response object, registering for OP_READ earlier should works
> as the server will only reply once your client has finished sending the
> response.
After reading your reply, I've tried to register for OP_READ just after connect like this:
                @Override
                public void onConnect(IOEvent<Context> ioEvent) {
                        try {
                                connectorHandler.finishConnect(ioEvent.attachment()
                                                .getSelectionKey());
                                System.out.println("connected.");
                                ioEvent.attachment().getSelectionKey().interestOps(
                                                SelectionKey.OP_WRITE|SelectionKey.OP_READ);
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                }
It seems that this doesn't work. :-(

What in my mind is this way:
1. ConnectorHandler.connect()
2. ConnectorCallbackHandler.onConnect(): register OP_WRITE to trigger ConnectorCallbackHandler.onWrite()
3. ConnectorCallbackHandler.onWrite()
*firstly write the data in async way
*register OP_READ to trigger ConnectorCallbackHandler.onRead() in AsyncWriteCallbackHandler.onWriteCompleted() method.
4. ConnectorCallbackHandler.onRead():
*read the data in async way
*get the complete data in AsyncReadCallbackHandler.onReadCompleted()
AM I IN THE RIGHT WAY? OR ANY OTHER MISUNDERSTANDINGS?

Thanks again!

>
> Did I've missed something?
>
> Thanks!
>
> -- Jeanfrancois
>
>
>>
>>
>> package tests.grizzly;
>>
>> import java.io.IOException;
>> import java.net.InetAddress;
>> import java.net.InetSocketAddress;
>> import java.net.SocketAddress;
>> import java.nio.ByteBuffer;
>> import java.nio.channels.SelectionKey;
>> import java.nio.charset.Charset;
>> import java.util.Queue;
>> import java.util.concurrent.CountDownLatch;
>>
>> import com.sun.grizzly.CallbackHandler;
>> import com.sun.grizzly.Context;
>> import com.sun.grizzly.Controller;
>> import com.sun.grizzly.ControllerStateListenerAdapter;
>> import com.sun.grizzly.DefaultProtocolChain;
>> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
>> import com.sun.grizzly.IOEvent;
>> import com.sun.grizzly.ProtocolChain;
>> import com.sun.grizzly.ProtocolFilter;
>> import com.sun.grizzly.TCPConnectorHandler;
>> import com.sun.grizzly.TCPSelectorHandler;
>> import com.sun.grizzly.Controller.Protocol;
>> import com.sun.grizzly.async.AsyncReadCallbackHandler;
>> import com.sun.grizzly.async.AsyncReadQueueRecord;
>> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>> import com.sun.grizzly.async.AsyncWriteQueueRecord;
>> import com.sun.grizzly.util.WorkerThread;
>>
>> public class TestNonblockingClient10 {
>>
>> public class MyProtocolFilter implements ProtocolFilter {
>>
>> @Override
>> public boolean execute(Context ctx) throws IOException {
>> System.out.println("execute()");
>> ByteBuffer byteBuffer = ((WorkerThread) Thread.currentThread())
>> .getByteBuffer();
>> System.out.println(Charset.defaultCharset().decode(byteBuffer));
>> return true;
>> }
>>
>> @Override
>> public boolean postExecute(Context ctx) throws IOException {
>> System.out.println("postExecute()");
>> return true;
>> }
>>
>> }
>>
>> /**
>> * @param args
>> * @throws IOException
>> * @throws InterruptedException
>> */
>> public static void main(String[] args) throws InterruptedException,
>> IOException {
>> TestNonblockingClient10 client = new TestNonblockingClient10();
>> client.start();
>> }
>>
>> private Controller controller;
>> private TCPConnectorHandler connectorHandler;
>> protected CountDownLatch controllerReadyLatch = new CountDownLatch(1);
>> ByteBuffer readBuffer = ByteBuffer.allocateDirect(10240);
>> CallbackHandler<Context> connectorCallbackHandler = new
>> CallbackHandler<Context>() {
>>
>> @Override
>> public void onConnect(IOEvent<Context> ioEvent) {
>> try {
>> connectorHandler.finishConnect(ioEvent.attachment()
>> .getSelectionKey());
>> System.out.println("connected.");
>> // trigger the ConnectorHandler's onWrite() method.
>> ioEvent.attachment().getSelectionKey().interestOps(
>> SelectionKey.OP_WRITE);
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>>
>> @Override
>> public void onRead(IOEvent<Context> ioEvent) {
>> System.out.println("onRead()");
>> Context context = ioEvent.attachment();
>> // context.getSelectionKey().interestOps(SelectionKey.OP_READ);
>> // try {
>> // context.getProtocolChain().execute(context);
>> // } catch (Exception e) {
>> // // TODO Auto-generated catch block
>> // e.printStackTrace();
>> // }
>> try {
>> connectorHandler.read(readBuffer, true);
>> System.out.println(Charset.defaultCharset().decode(readBuffer));
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>>
>> @Override
>> public void onWrite(IOEvent<Context> ioEvent) {
>> System.out.println("onWrite()");
>> try {
>> connectorHandler.writeToAsyncQueue(ByteBuffer
>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>> .getBytes()), asyncWriteHandler);
>> // ioEvent.attachment().getSelectionKey().interestOps(
>> // SelectionKey.OP_READ);
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>> };
>> protected AsyncReadCallbackHandler asyncReadHandler = new
>> AsyncReadCallbackHandler() {
>>
>> @Override
>> public void onIOException(IOException ioException, SelectionKey key,
>> ByteBuffer buffer, Queue<AsyncReadQueueRecord> remainingQueue) {
>> System.out.println("onIOException()");
>> }
>>
>> @Override
>> public void onReadCompleted(SelectionKey key, SocketAddress srcAddress,
>> ByteBuffer buffer) {
>> System.out.println("onReadCompleted()");
>> System.out.println(Charset.defaultCharset().decode(buffer));
>> }
>> };
>> protected AsyncWriteCallbackHandler asyncWriteHandler = new
>> AsyncWriteCallbackHandler() {
>>
>> @Override
>> public void onIOException(IOException ioException, SelectionKey key,
>> ByteBuffer buffer, Queue<AsyncWriteQueueRecord> remainingQueue) {
>> // TODO Auto-generated method stub
>>
>> }
>>
>> @Override
>> public void onWriteCompleted(SelectionKey key, ByteBuffer buffer) {
>> System.out.println("onWriteCompleted()");
>> key.interestOps(SelectionKey.OP_READ);
>> }
>> };
>>
>> private void start() throws InterruptedException, IOException {
>> controller = createController();
>> new Thread(controller).start();
>>
>> controllerReadyLatch.await();
>>
>> connectorHandler = (TCPConnectorHandler) controller
>> .acquireConnectorHandler(Protocol.TCP);
>>
>> // trigger the connector handler's onConnect() method.
>> connectorHandler.connect(new InetSocketAddress(InetAddress
>> .getLocalHost(), 8080), connectorCallbackHandler);
>> }
>>
>> private Controller createController() {
>> Controller tmpController = new Controller();
>>
>> TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler(true);
>> tmpController.setSelectorHandler(tcpSelectorHandler);
>>
>> DefaultProtocolChainInstanceHandler instanceHandler = new
>> DefaultProtocolChainInstanceHandler() {
>>
>> private ProtocolChain protocolChain;
>>
>> @Override
>> public ProtocolChain poll() {
>> if (protocolChain == null) {
>> protocolChain = new DefaultProtocolChain();
>> }
>> return protocolChain;
>> }
>> };
>> // instanceHandler.poll().addFilter(new ReadFilter());
>> // instanceHandler.poll().addFilter(new MyProtocolFilter());
>>
>> tmpController.setProtocolChainInstanceHandler(instanceHandler);
>>
>> tmpController.addStateListener(new ControllerStateListenerAdapter() {
>> @Override
>> public void onReady() {
>> controllerReadyLatch.countDown();
>> super.onReady();
>> }
>> });
>> return tmpController;
>> }
>>
>> }
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>