users@grizzly.java.net

Re: How to implement the client's message flow based on async read/write?

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Tue, 27 May 2008 16:04:04 -0400

Salut,

JianXing Yi wrote:
> Hi there,
>
> Undoutly, Grizzly is a good NIO framework.

Thanks!

  For practice purpose, I'm
> writing a NIO client sending a HTTP request and receiving its response
> based on Grizzly.

Interesting...

>
> 1. Performing connecting to server in the ConnectorHandler's
> onConnect() method, at the same time, for writing HTTP GET request
> purpose, the OP_WRITE interest registered in this method. I think this
> registration will be resulted in triggering the ConnectorHandler's
> onWrite() method.

Only if there is some write ops ready.

> 2. Performing writing in the ConnectorHandler's onWrite() method,
> through the asynchronous way like this:
> @Override
> public void onWrite(IOEvent<Context> ioEvent) {
> System.out.println("onWrite()");
> try {
> connectorHandler.writeToAsyncQueue(ByteBuffer
> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
> .getBytes()), asyncWriteHandler);
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> 3. Registering OP_READ interest in the AsyncWriteCallbackHandler's
> onWriteCompleted() method. By doing this, I think the
> ConnectorHandler's onRead() method will be triggered, so I can read
> the HTTP response. If reading the response in blocking way, then there
> is no problems.
> MY PROBLEM IS: IF I USE ConnectorHandler(in fact a
> TcpConnectorHandler).readFromAsyncQueue() METHOD TO READ THE RESPONSE,
> THEN I CANNOT GET THE RESPONSE CORRECTLY. WHAT'S MORE, THE
> AsyncReadCallbackHandler's onReadCompleted() METHOD NEVER BEING
> CALLED!
>
> Below is the testing client code. Thanks for your help!

Where are you registering's back the OP_READ? From the code below (I
haven't yet had a chance to run it...in an airport right now :-)), the
registration seems to be commented out.

Have you try to register for OP_READ just after connect? Since HTTP is a
request->response object, registering for OP_READ earlier should works
as the server will only reply once your client has finished sending the
response.

Did I've missed something?

Thanks!

-- Jeanfrancois


>
>
> package tests.grizzly;
>
> import java.io.IOException;
> import java.net.InetAddress;
> import java.net.InetSocketAddress;
> import java.net.SocketAddress;
> import java.nio.ByteBuffer;
> import java.nio.channels.SelectionKey;
> import java.nio.charset.Charset;
> import java.util.Queue;
> import java.util.concurrent.CountDownLatch;
>
> import com.sun.grizzly.CallbackHandler;
> import com.sun.grizzly.Context;
> import com.sun.grizzly.Controller;
> import com.sun.grizzly.ControllerStateListenerAdapter;
> import com.sun.grizzly.DefaultProtocolChain;
> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
> import com.sun.grizzly.IOEvent;
> import com.sun.grizzly.ProtocolChain;
> import com.sun.grizzly.ProtocolFilter;
> import com.sun.grizzly.TCPConnectorHandler;
> import com.sun.grizzly.TCPSelectorHandler;
> import com.sun.grizzly.Controller.Protocol;
> import com.sun.grizzly.async.AsyncReadCallbackHandler;
> import com.sun.grizzly.async.AsyncReadQueueRecord;
> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
> import com.sun.grizzly.async.AsyncWriteQueueRecord;
> import com.sun.grizzly.util.WorkerThread;
>
> public class TestNonblockingClient10 {
>
> public class MyProtocolFilter implements ProtocolFilter {
>
> @Override
> public boolean execute(Context ctx) throws IOException {
> System.out.println("execute()");
> ByteBuffer byteBuffer = ((WorkerThread) Thread.currentThread())
> .getByteBuffer();
> System.out.println(Charset.defaultCharset().decode(byteBuffer));
> return true;
> }
>
> @Override
> public boolean postExecute(Context ctx) throws IOException {
> System.out.println("postExecute()");
> return true;
> }
>
> }
>
> /**
> * @param args
> * @throws IOException
> * @throws InterruptedException
> */
> public static void main(String[] args) throws InterruptedException,
> IOException {
> TestNonblockingClient10 client = new TestNonblockingClient10();
> client.start();
> }
>
> private Controller controller;
> private TCPConnectorHandler connectorHandler;
> protected CountDownLatch controllerReadyLatch = new CountDownLatch(1);
> ByteBuffer readBuffer = ByteBuffer.allocateDirect(10240);
> CallbackHandler<Context> connectorCallbackHandler = new
> CallbackHandler<Context>() {
>
> @Override
> public void onConnect(IOEvent<Context> ioEvent) {
> try {
> connectorHandler.finishConnect(ioEvent.attachment()
> .getSelectionKey());
> System.out.println("connected.");
> // trigger the ConnectorHandler's onWrite() method.
> ioEvent.attachment().getSelectionKey().interestOps(
> SelectionKey.OP_WRITE);
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
> @Override
> public void onRead(IOEvent<Context> ioEvent) {
> System.out.println("onRead()");
> Context context = ioEvent.attachment();
> // context.getSelectionKey().interestOps(SelectionKey.OP_READ);
> // try {
> // context.getProtocolChain().execute(context);
> // } catch (Exception e) {
> // // TODO Auto-generated catch block
> // e.printStackTrace();
> // }
> try {
> connectorHandler.read(readBuffer, true);
> System.out.println(Charset.defaultCharset().decode(readBuffer));
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
> @Override
> public void onWrite(IOEvent<Context> ioEvent) {
> System.out.println("onWrite()");
> try {
> connectorHandler.writeToAsyncQueue(ByteBuffer
> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
> .getBytes()), asyncWriteHandler);
> // ioEvent.attachment().getSelectionKey().interestOps(
> // SelectionKey.OP_READ);
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> };
> protected AsyncReadCallbackHandler asyncReadHandler = new
> AsyncReadCallbackHandler() {
>
> @Override
> public void onIOException(IOException ioException, SelectionKey key,
> ByteBuffer buffer, Queue<AsyncReadQueueRecord> remainingQueue) {
> System.out.println("onIOException()");
> }
>
> @Override
> public void onReadCompleted(SelectionKey key, SocketAddress srcAddress,
> ByteBuffer buffer) {
> System.out.println("onReadCompleted()");
> System.out.println(Charset.defaultCharset().decode(buffer));
> }
> };
> protected AsyncWriteCallbackHandler asyncWriteHandler = new
> AsyncWriteCallbackHandler() {
>
> @Override
> public void onIOException(IOException ioException, SelectionKey key,
> ByteBuffer buffer, Queue<AsyncWriteQueueRecord> remainingQueue) {
> // TODO Auto-generated method stub
>
> }
>
> @Override
> public void onWriteCompleted(SelectionKey key, ByteBuffer buffer) {
> System.out.println("onWriteCompleted()");
> key.interestOps(SelectionKey.OP_READ);
> }
> };
>
> private void start() throws InterruptedException, IOException {
> controller = createController();
> new Thread(controller).start();
>
> controllerReadyLatch.await();
>
> connectorHandler = (TCPConnectorHandler) controller
> .acquireConnectorHandler(Protocol.TCP);
>
> // trigger the connector handler's onConnect() method.
> connectorHandler.connect(new InetSocketAddress(InetAddress
> .getLocalHost(), 8080), connectorCallbackHandler);
> }
>
> private Controller createController() {
> Controller tmpController = new Controller();
>
> TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler(true);
> tmpController.setSelectorHandler(tcpSelectorHandler);
>
> DefaultProtocolChainInstanceHandler instanceHandler = new
> DefaultProtocolChainInstanceHandler() {
>
> private ProtocolChain protocolChain;
>
> @Override
> public ProtocolChain poll() {
> if (protocolChain == null) {
> protocolChain = new DefaultProtocolChain();
> }
> return protocolChain;
> }
> };
> // instanceHandler.poll().addFilter(new ReadFilter());
> // instanceHandler.poll().addFilter(new MyProtocolFilter());
>
> tmpController.setProtocolChainInstanceHandler(instanceHandler);
>
> tmpController.addStateListener(new ControllerStateListenerAdapter() {
> @Override
> public void onReady() {
> controllerReadyLatch.countDown();
> super.onReady();
> }
> });
> return tmpController;
> }
>
> }
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>