users@grizzly.java.net

Re: How to implement the client's message flow based on async read/write?

From: JianXing Yi <jianxing.yi_at_gmail.com>
Date: Wed, 28 May 2008 10:30:13 +0800

Many thanks for your help!

2008/5/27, Oleksiy Stashok <Oleksiy.Stashok_at_sun.com>:
> Hi,
>
> try to call handler.readFromAsyncQueue() before writing anything...
> like following...
>
>> @Override
>> public void onWrite(IOEvent<Context> ioEvent) {
>> System.out.println("onWrite()");
>> try {
>
> connectorHandler.readFromAsyncQueue(......);
>>
>> connectorHandler.writeToAsyncQueue(ByteBuffer
>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>> .getBytes()), asyncWriteHandler);
>> // ioEvent.attachment().getSelectionKey().interestOps(
>> // SelectionKey.OP_READ);
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>
> The trick with async read/write queues is that you can use it without
> CallbackHandler's notifications. Framework will take care about
> sending and receiving data async. itself.
After reading your mail, I've tried this but it seems not work. What a pity :-(.
But the way your mentioned is exactly what I want. I've found some API performing registration IO EVENT HANDLERs to Grizzly framework but failed. It seems that Grizzly does not work in a direct EVENT-HANDLER way... Am I misunderstanding?
If working in the conbination way you mentioned, then SelectionKey(s) should be properly registered in right place. I do this as below:
1. ConnectorHandler.connect()
2. ConnectorCallbackHandler.onConnect(): register OP_WRITE to trigger ConnectorCallbackHandler.onWrite()
3. ConnectorCallbackHandler.onWrite()
*firstly write the data in async way
*register OP_READ to trigger ConnectorCallbackHandler.onRead() in AsyncWriteCallbackHandler.onWriteCompleted() method.
4. ConnectorCallbackHandler.onRead():
*read the data in async way
*get the complete data in AsyncReadCallbackHandler.onReadCompleted()
AM I IN THE RIGHT WAY? OR ANY OTHER MISUNDERSTANDINGS?

Thanks again!

> So once ConnectorHandler is connected - you can start to use async.
> read/write queues, don't wait for CallbackHandler.onRead/Write
> notification calls.
> But for sure it's possible to combine that as you do.
>
> Thanks.
>
> WBR,
> Alexey.
>
> On May 27, 2008, at 10:47 , JianXing Yi wrote:
>
>> Hi there,
>>
>> Undoutly, Grizzly is a good NIO framework. For practice purpose, I'm
>> writing a NIO client sending a HTTP request and receiving its response
>> based on Grizzly.
>>
>> 1. Performing connecting to server in the ConnectorHandler's
>> onConnect() method, at the same time, for writing HTTP GET request
>> purpose, the OP_WRITE interest registered in this method. I think this
>> registration will be resulted in triggering the ConnectorHandler's
>> onWrite() method.
>> 2. Performing writing in the ConnectorHandler's onWrite() method,
>> through the asynchronous way like this:
>> @Override
>> public void onWrite(IOEvent<Context> ioEvent) {
>> System.out.println("onWrite()");
>> try {
>> connectorHandler.writeToAsyncQueue(ByteBuffer
>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>> .getBytes()), asyncWriteHandler);
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>> 3. Registering OP_READ interest in the AsyncWriteCallbackHandler's
>> onWriteCompleted() method. By doing this, I think the
>> ConnectorHandler's onRead() method will be triggered, so I can read
>> the HTTP response. If reading the response in blocking way, then there
>> is no problems.
>> MY PROBLEM IS: IF I USE ConnectorHandler(in fact a
>> TcpConnectorHandler).readFromAsyncQueue() METHOD TO READ THE RESPONSE,
>> THEN I CANNOT GET THE RESPONSE CORRECTLY. WHAT'S MORE, THE
>> AsyncReadCallbackHandler's onReadCompleted() METHOD NEVER BEING
>> CALLED!
>>
>> Below is the testing client code. Thanks for your help!
>>
>>
>> package tests.grizzly;
>>
>> import java.io.IOException;
>> import java.net.InetAddress;
>> import java.net.InetSocketAddress;
>> import java.net.SocketAddress;
>> import java.nio.ByteBuffer;
>> import java.nio.channels.SelectionKey;
>> import java.nio.charset.Charset;
>> import java.util.Queue;
>> import java.util.concurrent.CountDownLatch;
>>
>> import com.sun.grizzly.CallbackHandler;
>> import com.sun.grizzly.Context;
>> import com.sun.grizzly.Controller;
>> import com.sun.grizzly.ControllerStateListenerAdapter;
>> import com.sun.grizzly.DefaultProtocolChain;
>> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
>> import com.sun.grizzly.IOEvent;
>> import com.sun.grizzly.ProtocolChain;
>> import com.sun.grizzly.ProtocolFilter;
>> import com.sun.grizzly.TCPConnectorHandler;
>> import com.sun.grizzly.TCPSelectorHandler;
>> import com.sun.grizzly.Controller.Protocol;
>> import com.sun.grizzly.async.AsyncReadCallbackHandler;
>> import com.sun.grizzly.async.AsyncReadQueueRecord;
>> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>> import com.sun.grizzly.async.AsyncWriteQueueRecord;
>> import com.sun.grizzly.util.WorkerThread;
>>
>> public class TestNonblockingClient10 {
>>
>> public class MyProtocolFilter implements ProtocolFilter {
>>
>> @Override
>> public boolean execute(Context ctx) throws IOException {
>> System.out.println("execute()");
>> ByteBuffer byteBuffer = ((WorkerThread) Thread.currentThread())
>> .getByteBuffer();
>> System.out.println(Charset.defaultCharset().decode(byteBuffer));
>> return true;
>> }
>>
>> @Override
>> public boolean postExecute(Context ctx) throws IOException {
>> System.out.println("postExecute()");
>> return true;
>> }
>>
>> }
>>
>> /**
>> * @param args
>> * @throws IOException
>> * @throws InterruptedException
>> */
>> public static void main(String[] args) throws InterruptedException,
>> IOException {
>> TestNonblockingClient10 client = new TestNonblockingClient10();
>> client.start();
>> }
>>
>> private Controller controller;
>> private TCPConnectorHandler connectorHandler;
>> protected CountDownLatch controllerReadyLatch = new
>> CountDownLatch(1);
>> ByteBuffer readBuffer = ByteBuffer.allocateDirect(10240);
>> CallbackHandler<Context> connectorCallbackHandler = new
>> CallbackHandler<Context>() {
>>
>> @Override
>> public void onConnect(IOEvent<Context> ioEvent) {
>> try {
>> connectorHandler.finishConnect(ioEvent.attachment()
>> .getSelectionKey());
>> System.out.println("connected.");
>> // trigger the ConnectorHandler's onWrite() method.
>> ioEvent.attachment().getSelectionKey().interestOps(
>> SelectionKey.OP_WRITE);
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>>
>> @Override
>> public void onRead(IOEvent<Context> ioEvent) {
>> System.out.println("onRead()");
>> Context context = ioEvent.attachment();
>> // context.getSelectionKey().interestOps(SelectionKey.OP_READ);
>> // try {
>> // context.getProtocolChain().execute(context);
>> // } catch (Exception e) {
>> // // TODO Auto-generated catch block
>> // e.printStackTrace();
>> // }
>> try {
>> connectorHandler.read(readBuffer, true);
>> System.out.println(Charset.defaultCharset().decode(readBuffer));
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>>
>> @Override
>> public void onWrite(IOEvent<Context> ioEvent) {
>> System.out.println("onWrite()");
>> try {
>> connectorHandler.writeToAsyncQueue(ByteBuffer
>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>> .getBytes()), asyncWriteHandler);
>> // ioEvent.attachment().getSelectionKey().interestOps(
>> // SelectionKey.OP_READ);
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>> };
>> protected AsyncReadCallbackHandler asyncReadHandler = new
>> AsyncReadCallbackHandler() {
>>
>> @Override
>> public void onIOException(IOException ioException, SelectionKey key,
>> ByteBuffer buffer, Queue<AsyncReadQueueRecord> remainingQueue) {
>> System.out.println("onIOException()");
>> }
>>
>> @Override
>> public void onReadCompleted(SelectionKey key, SocketAddress
>> srcAddress,
>> ByteBuffer buffer) {
>> System.out.println("onReadCompleted()");
>> System.out.println(Charset.defaultCharset().decode(buffer));
>> }
>> };
>> protected AsyncWriteCallbackHandler asyncWriteHandler = new
>> AsyncWriteCallbackHandler() {
>>
>> @Override
>> public void onIOException(IOException ioException, SelectionKey key,
>> ByteBuffer buffer, Queue<AsyncWriteQueueRecord> remainingQueue) {
>> // TODO Auto-generated method stub
>>
>> }
>>
>> @Override
>> public void onWriteCompleted(SelectionKey key, ByteBuffer buffer) {
>> System.out.println("onWriteCompleted()");
>> key.interestOps(SelectionKey.OP_READ);
>> }
>> };
>>
>> private void start() throws InterruptedException, IOException {
>> controller = createController();
>> new Thread(controller).start();
>>
>> controllerReadyLatch.await();
>>
>> connectorHandler = (TCPConnectorHandler) controller
>> .acquireConnectorHandler(Protocol.TCP);
>>
>> // trigger the connector handler's onConnect() method.
>> connectorHandler.connect(new InetSocketAddress(InetAddress
>> .getLocalHost(), 8080), connectorCallbackHandler);
>> }
>>
>> private Controller createController() {
>> Controller tmpController = new Controller();
>>
>> TCPSelectorHandler tcpSelectorHandler = new
>> TCPSelectorHandler(true);
>> tmpController.setSelectorHandler(tcpSelectorHandler);
>>
>> DefaultProtocolChainInstanceHandler instanceHandler = new
>> DefaultProtocolChainInstanceHandler() {
>>
>> private ProtocolChain protocolChain;
>>
>> @Override
>> public ProtocolChain poll() {
>> if (protocolChain == null) {
>> protocolChain = new DefaultProtocolChain();
>> }
>> return protocolChain;
>> }
>> };
>> // instanceHandler.poll().addFilter(new ReadFilter());
>> // instanceHandler.poll().addFilter(new MyProtocolFilter());
>>
>> tmpController.setProtocolChainInstanceHandler(instanceHandler);
>>
>> tmpController.addStateListener(new
>> ControllerStateListenerAdapter() {
>> @Override
>> public void onReady() {
>> controllerReadyLatch.countDown();
>> super.onReady();
>> }
>> });
>> return tmpController;
>> }
>>
>> }
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>