>> Complete Source Code for the Tutorial

Using Grizzly Connection Caching. 

Contributed and maintained by Wenbo Zhu
August 2007
[Revision number: V0-1]
This publication is applicable to Grizzly 1.6.x releases. 


This document provides a complete tutorial for understanding and using the Grizzly Connection Caching framework. The example code also demonstrates the general use of Grizzly and its underlying NIO technologies. 

For more information about Grizzly, see the Grizzly Home page on the java.net web site.

Expected duration: 45 minutes

Contents

Tutorial Requirements


Before you proceed, make sure you review the requirements in this section.

Prerequisites

This tutorial assumes that you have some basic knowledge of, or programming experience with, the following technologies.

Software Needed for This Tutorial

Before you begin, you need to install the following software on your computer:

Introduction


This tutorial will show step by step how to build a very simple "messaging ORB" using the Grizzly framework on the server side and the general NIO on the client side. The underlying design is briefly discussed here first: 

Server side

  1. The server object invocation is purely asynchronous, in that requests are sent to servers asynchronously.   
  2. Responses are sent back to clients asynchronously too via a client listener end point, whose address comes with the original request.  
  3. We call this "messaging" ORB (morb) to emphasize the asynchronous nature of the resulting programming model, as opposed to the underlying wire protocol, which one way or the other uses the same Grizzly, e.g. 
  4. This is not a real ORB of couse. To minimize the distraction (as a Grizzly tutorial), remote object invocation semantics have been largely ignored.  


Client side

  1. Clients send requests asynchronously and registers with its local runtime a callback handler for "future" responses.
  2. Typical ORB plays dual role of server and client. In this tutorial, however, we assume that the client runtime only generates requests (hence a pure client).
  3. Raw NIO is used to build the simple client runtime to interact with the server ORB instance. NIO tricks/guidelines (ref. Grizzly document) are followed. In some way, the client implementation tries to show the "troubles" for not using a NIO framework or what may be under the hood of an NIO framework.

Server-side Implementation


In the following, we will describe how the server side function (morb) is implemented. The server uses Grizzly framework for both receiving request messages (as server) and sending response messages (as client). As a result, both Grizzly server api and client api are used, as well as their respective connection caching mechanisms. 

  1. First, we have the following interface defined to represent a server object instance that a client can invoked. As pointed out earlier, the API is made "type-less" for simplification.  
public interface RequestHandler  { ... }
String handle(String request) throws Exception;


  1. The actual server implementation is provided by the following class.  Exception handling is much simplfied, unless it's needed for resource reclaim or error recovery. 
public class ServerRequestController  { ... }
public ServerRequestController(String serverId) { ... }      // serverId is an app. defined identifier for tracing purpose only

public String getServerId() { ... }

public InetSocketAddress getLocalAddr() { ... } // the local address that the controller (i.e. morb instance) binds to

public void register(String objectId, RequestHandler handler) { ... } // an server object instance: objectId essentially provides "naming"

public void unregister(String objectId) { ... }

public void open(String addr) throws Exception { ... }

public void close() { ... }


Now we will describe the initialization open() and shutdown close() routines.  The code is annotated as following:

public void open(String addr) throws Exception { ... }
        this.localAddr = resolveAddressParam(addr);                // defined with the Utils class

        this.grizzlyController = new Controller();                       //  the Grizzly controller, for both sending and receiving

        ConnectorHandlerPool cacheableHandlerPool = new CacheableConnectorHandlerPool(                // enable connection caching for outbound messages (sender)
                grizzlyController, 5, 2, 2);
        grizzlyController.setConnectorHandlerPool(cacheableHandlerPool);

        SelectionKeyHandler cacheableKeyHandler = new CacheableSelectionKeyHandler(                     // enable connection caching for inbound messages (receiver)
                5, 2);
        grizzlyController.setSelectionKeyHandler(cacheableKeyHandler);

        TCPSelectorHandler tcpHandler = new TCPSelectorHandler();                                                   // typical setup for a Grizzly TCP endpoint
        final MessageDecoder filter = new MessageDecoder();

        tcpHandler.setInet(localAddr.getAddress());
        tcpHandler.setPort(localAddr.getPort());

        grizzlyController
                .setProtocolChainInstanceHandler(new DefaultProtocolChainInstanceHandler() {
                    public ProtocolChain poll() {
                        ProtocolChain protocolChain = protocolChains.poll();

                        if (protocolChain == null) {
                            protocolChain = new DefaultProtocolChain();
                            protocolChain.addFilter(new ReadFilter());
                            protocolChain.addFilter(filter);
                        }

                        return protocolChain;
                    }
                });
        grizzlyController.addSelectorHandler(tcpHandler);
        grizzlyController.start();                                                                                                          // Grizzly runs from here ... 


  1. As you can see from the above code, it is mostly transparent to enable connection caching on Grizzly.  The above code refers to MessageDecoder, which defines the protocol filter that does the actual processing of the incoming request messages (TCP). This filter class is defined as an inner class as the following:

private class MessageDecoder implements ProtocolFilter { ... }
        public boolean execute(Context context) throws IOException {
            final WorkerThread workerThread = ((WorkerThread) Thread
                    .currentThread());
            ByteBuffer buffer = workerThread.getByteBuffer();
            buffer.flip();

            if (buffer.hasRemaining()) {
                handleRequest(readBuffer(buffer), context);
            }
            buffer.clear();
            return false;
        }

        public boolean postExecute(Context context) throws IOException {
            return true;
        }

        private void handleRequest(String msg, Context context)
                throws IOException {

            String msgBuffer = (String) context.getAttribute("msgBuffer");                              // earlier partial content
            if (msgBuffer == null) {
                msgBuffer = "";
            }

            msgBuffer = msgBuffer + msg;

            if (msgBuffer.indexOf(PAYLOAD_DELIMITER) != -1) {
                String[] messages = msgBuffer.split(PAYLOAD_DELIMITER);

                for (int i = 0; i < messages.length - 1; i++) {
                    handleAtomicRequest(messages[i]);                                                           //  a complete message body is received
                }

                String lastMessage = messages[messages.length - 1];
                if (msgBuffer.endsWith(PAYLOAD_DELIMITER)) {
                    handleAtomicRequest(lastMessage);
                    msgBuffer = "";
                } else {
                    msgBuffer = lastMessage;
                }
            }

            context.setAttribute("msgBuffer", msgBuffer);                                                       // save the partial message for subsequent processing
        }
       
        private void handleAtomicRequest(String msg) {                                                       //  message format ::=

            try {                                                                                                                  //  <client id - inet address>"\n"
                int pos = msg.indexOf("\n");                                                                            //  <serverObjectId - app. defined>"\n"
                String clientId = msg.substring(0, pos);                                                            //  <message content - representing parameters><payload delimeter>

                String content = msg.substring(pos + 1);                                                          //  <payload delimeter> ::= "###\n"
                pos = content.indexOf("\n");
                String serverObjectId = content.substring(0, pos);
                String msgBody = content.substring(pos + 1);

                RequestHandler handler = handlers.get(serverObjectId);
                if (handler != null) {
                    String rsp = handler.handle(msgBody);
                    sendResponse(clientId, rsp);
                }
            } catch (Exception ex) {
                trace(ex);
            }
        }

        private void sendResponse(String clientId, String msg) { ... }


The message format is quite dummy, but it has the essential parts of client-server invocation semantics. The receiver thread (as provided from Grizzly) will be also responsible for sending out the response to the wire (asynchronously). ARP could also be used here, to make it fancier. 

Overall, processing the receiving message is straightforward (with the use of Grizzly) . 

The following code shows the detail of sendResponse. The more obvious use of Grizzly connection caching actually happens here. As you can see from the code, under the predefined cache pool limits, an outbound connection (to a remote client) will be acquired from the connection cache pool directly (if available). This saves the cost of generating a new connection, which involves expensive native socket operation.


private void sendResponse(String clientId, String msg)  { ... }
            String rspMsg = serverId + "\n" + msg + PAYLOAD_DELIMITER;                //  the response message format

            InetSocketAddress clientIdAddr = resolveAddress(clientId);

           ConnectorHandler clientConnector = grizzlyController                                 // Initiates new connection or reuses one from cache
                    .acquireConnectorHandler(Controller.Protocol.TCP);

            try {
                while (true) {                                                                                     // intermittent - expected ??
                    try {
                        clientConnector
                                .connect(clientIdAddr, new BlockingCallbackHandler(
                                        clientConnector, rspMsg));
                        break;
                    } catch (AlreadyConnectedException ex) {
                        trace("partial failure: already connected");
                    }
                }

                try {                                                                                                    // If limit is not reached - connection will be put back to the cache
                    clientConnector.close();
                } finally {
                    grizzlyController.releaseConnectorHandler(clientConnector);
                }

            } catch (IOException ex) {
                trace(ex);
            }
        }



The actual handling of sending the response is again asynchronous, via the callback API (Grizzly).

private static class BlockingCallbackHandler implements CallbackHandler<Context> { ... }
        private ConnectorHandler connector;

        private String msg;

        public BlockingCallbackHandler(ConnectorHandler connector, String msg) {
            this.connector = connector;
            this.msg = msg;
        }

        public void onConnect(IOEvent<Context> ioEvent) {

            SelectionKey key = ioEvent.attachment().getSelectionKey();
            connector.finishConnect(key);

            connector.getController().registerKey(key, SelectionKey.OP_WRITE,
                    Controller.Protocol.TCP);
        }

        public void onRead(IOEvent<Context> ioEvent) {
        }

        public void onWrite(IOEvent<Context> ioEvent) {
            try {
                connector.write(ByteBuffer.wrap(msg.getBytes()), true);                                      // a blocking write that uses the Grizzly "sender" API
            } catch (IOException ex) {
                trace(ex);
            }
        }


Client-side Implementation


Due to the space limitation, we will not show the client code here. In the following, the client-side design is briefly described. It is of course possible to implement the client runtime (interacting with a remote MORB) with Grizzly too, or any other implementation as long as it aligns with the server morb protocol. 

The provided client implementation uses the basic NIO and adopts the following guidelines as recommended in the Grizzly document:

There's a single channel for sending request messages to a specific remote server, via RequestChannel.  Client application runtime may choose to cache this (as in this tutorial example) to facilitate the server side inbound connection caching.

The client runtime is supposed to instantiate a singleton response processor, via ResponseDispatchChannel. It binds to a pre-specified port, which will be included with each outbound client request message, along with the client inet address. 

Client application registers with the client runtime (that interacts with remote MORBs) a per-server-id hanlder: ResponseHandler. The actual implementation of ResponseHandler decides how the application-level API may be exposed. For now, it's just a dummy callback.

Putting things together


In this section, we will go through the server and client setup, and describe what's expected from the example testing program, and how to observe the connection caching behavior.

  1. On the server side, an MORB is instantiated as the following:  
public class MessagingORB { ... }
    private ServerRequestController serverController;

    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            trace("Usage: MessagingORB <local ip>:<local port>");
            return;
        }

        MessagingORB morb = new MessagingORB();

        morb.init(args[0]);
        morb.deploy();  
        morb.open(args[0]);
        morb.close();  
    }

    public void init(String localAddr) throws Exception {
        serverController = new ServerRequestController("server-" + localAddr);
    }

    public void open(String addr) throws Exception {
        serverController.open(addr);
    }

    public void close() {
        serverController.close();
    }

    public void deploy() {                                                                                     // a simple server object instance - named as "server", which echoes each request
        serverController.register("server", new RequestHandler() {
            public String handle(String request) throws Exception {
                trace("invoke " + request);
                return "request handled: " + request;
            }
        });
    }

  1. On the client side, a standalone client app. is created for the demo purpose. It uses the client runtime support we introduced earlier. The client code sequentially invokes a list of server objects, asynchronously. A single dispatch thread manages the inbound connections (responses). The main application routine, under the user control, loops through the list of server ids (input parameter) and sends each one a request containing an auto-generated sequence id.  In dealing with the server-side inbound connection cache, a failed outbound connection (request) will be retried once if the cached one fails the first time. Upon successful sending a request, the outbound connection will be cached via a simple per-server-id hashmap.
public class SimpleMORBClientApp { ... }
    private ResponseDispatchChannel in;
    
    private Map<InetSocketAddress, RequestChannel> requestChannelCache = new HashMap<InetSocketAddress, RequestChannel>();

    private List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();

    private Thread dispatchThread;

    public static void main(String[] args) throws Exception {

        if (args.length < 2) {
            trace("Usage: SimpleClientApp <local ip>:<local port> <server ip>:<server port> [<server ip>:<server port>]");
            return;
        }

        SimpleMORBClientApp app = new SimpleMORBClientApp();

        app.init(args[0]);

        for (int i = 1; i < args.length; i++) {
            app.register(resolveAddressParam(args[i]));
        }

        app.start();       
        app.doSomething();
        app.end();
    }

    public void init(String localAddr) throws Exception {
        in = new ResponseDispatchChannel(resolveAddressParam(localAddr));
        in.open();
    }

    public void register(final InetSocketAddress serverId) throws Exception {                               // a simple client response handler that prints out the request text
        servers.add(serverId);
        in.register(serverId, new ResponseHandler() {
            public void handle(String response) throws Exception {
                trace("response from " + serverId + ":" + response);
            }
        });
    }

    public void start() {

        dispatchThread = new Thread(new Runnable() {
            public void run() {
                try {
                    in.dispatch();
                } catch (InterruptedException ex) {
                    trace("dispatch thread interrupted");
                } catch (Exception ex) {
                    trace(ex);
                }
            }
        });

        dispatchThread.start();
    }

    public void end() {

        dispatchThread.interrupt();
        try {
            dispatchThread.join();
        } catch (InterruptedException ex) {
        }
        
        for (RequestChannel channel : requestChannelCache.values()) {
            channel.close();
        }

        in.close();
    }

    public void doSomething() {

        RequestChannel out;
        int i = 0;
        String msg;
        do {
            for (InetSocketAddress server : servers) {
                
                msg = in.getClientId().toString() + "[" + i++ + "]" + "any message";         // set the String length > 256 to test multiple read/write (buffer=1024b)
                out = requestChannelCache.get(server);
                try {
                    if (out == null) {
                        out = new RequestChannel(server);
                        out.open();
                    }
                    out.sendRequest("server", msg, in.getClientId());
                } catch (Exception ex) {
                    trace(ex);
                    trace("retry: " + server);
                    out.close();
                    out = new RequestChannel(server);
                    
                    try {
                        out.open();
                        out.sendRequest("server", msg, in.getClientId());
                    }
                    catch (Exception exx) {
                        trace(exx);
                        trace("failed: " + server);
                        out.close();
                        continue;
                    }
                }
                requestChannelCache.put(server, out);
            }
        } while (step("type c to continue") == 'c');
    }


With the above example test application, both the inbound connection cache and outbound connection cache of the Grizzly framework are demonstrated. Interested readers may try to adjust the # of client instances and server instances, and use tools like "netstat -nt" to monitor the open connections between clients and servers, and their variations.

Conclusion


I hope you find this tutorial useful in understanding the Grizzly and its connection cache framework. Comments to make the overall tutorial simpler (i.e. shorter) or more complicated (i.e. revealing more Grizzly) are more than welcome!

There are still intermittent issues (AlreadyConnectedException) in sending resonses from the server, for which I am yet to find the root cause. The current  implementation however works as expected even under such (unexpected) failure conditions.

Send Us Your Feedback



top


>> Complete Source Code for the Tutorial