dev@grizzly.java.net

Re: [Fwd: tutorial update ...]

From: Ken Cavanaugh <Ken.Cavanaugh_at_Sun.COM>
Date: Mon, 22 Oct 2007 10:14:39 -0700
charlie hunt wrote:
Ken / Alexy,

Wenbo Zhu has updated the connection cache tutorial with Ken's feedback.  The update is in the attached e-mail.

There's some outstanding questions / issues which were identified by Ken that need a resolution.  I think that information will need to come from Alexy.
Yes,  Hopefully we can put this on this Wednesday's meeting if Alexy has time to look at the tutorial.

You'll notice in Wenbo's update, he's integrated Ken's comments.  I think Wenbo may be thinking he needs to include some information about how CORBA will use connection caching.  I don't think that was necessarily Ken's intention? 
CORBA could be used as an example (specifically the impact of a multiplexed protocol on connection caching), but CORBA connection
caching itself should not be covered in the tutorial.

So, we may need to clarify that with Wenbo.

Feel free to continue the conversation on the "dev" Grizzly mailing list.

Btw, I told Wenbo I would check the status of building the latest bits from Grizzly.

charlie ...

-------- Original Message --------
Subject:     tutorial update ...
Date:     Sun, 21 Oct 2007 21:21:07 -0700
From:     Wenbo Zhu <wenbozhu@gmail.com>
To:     Charles.J.Hunt@Sun.COM
CC:     wenbozhu@gmail.com



Hi Charlie,

I have updated the tutorial with your earlier comment and Ken's ..
Specifically for Ken's comment, maybe you or Alexey would like to have
a look too .. wrt. Grizzly CORBA integration (I suppose).

I also updated the source a bit to accommodate the latest Grizzly
update ... (I actually saw compile error from the current repository
... due to mismatching"throws IOException" from method finishConnect()
... .It could be just a problem of mine ...

Cheers,
Wenbo






Developing NetBeans IDE/Certain Pack Content for netbeans.org Publication (This title must match the string in the h1 tag)

>> Complete Source Code for the Tutorial

Using Grizzly Connection Caching. 


This document provides a complete tutorial for understanding and using the Grizzly Connection Caching framework. The example code also demonstrates the general use of Grizzly and its underlying NIO technologies. 

For more information about Grizzly, see the Grizzly Home page on the java.net web site.

Expected duration: 45 minutes

Contents

Tutorial Requirements


Before you proceed, make sure you review the requirements in this section.

Prerequisites

This tutorial assumes that you have some basic knowledge of, or programming experience with, the following technologies.

  • TCP Socket
  • NIO

Software Needed for This Tutorial

Before you begin, you need to install the following software on your computer:

  • Grizzly 1.6.x framework distribution (download)


General Introduction


In the following, we briefly introduce the relevant concepts behind Grizzly and its Connection Caching function. 

What is important about Grizzly: 

  1. Grizzly is a framework that has been developed with a clear distinction between transport and protocol.
  2. Grizzly, as a framework, doesn't provide protocol support (directly). 

About connection caching:  (Ken Cavanaugh)

  1. Grizzly provides connection caching for those protocols that require it.
  1. We'll examine the issues for the inbound and outbound caches separately. First, we need to look at some common characteristics of protocols supported by the connection cache.

Protocol Issues

Many protocols have a number of similar requirements
  1. A single request may require sending several separate message fragments on a connection. This is to allow more efficient buffering when sending large requests.
  2. A connection may be shared by several simultaneous requests. In the most complex case, the protocol may allow interleaving fragements from several messages.
  3. A protocol may expect that a response for a request uses the same connection that the request used. This is certainly not always the case: message oriented middleware generally just sends a message, and there may not even be a direct response.
These requirements have a strong impact on the design of the cache. In particular, CORBA IIOP has all 3 requirements.

The Inbound Connection Cache

The inbound connection cache needs to provide a means to obtain a connection to a transport endpoint (internally referred to as a ContactInfo), and also needs to manage the connection cache to avoid holding onto too many open connections. But closing connections can be dangerous: the cache cannot close a connection that is still in use. This leads to the following basic API:
  1. Connection get( ContactInfo cinfo )
  2. void release( Connection conn, int numResponseExpected )
  3. void responseReceived( Connection conn ).
The get method is used to obtain a connection. It may reuse an existing connection, or create a new one, according to the following algorithm:
In addition, get will always return a valid connection (even if this causes the configuration parameters to be temporarily violated), UNLESS it cannot open a connection for the ContactInfo, in which case an IOException is thrown.

The basic per-request use of the cache is as follows:
  1. Call get to obtain the Connection. In grizzly, this is done in CacheableConnectorHandler.connect.
  2. Send messages for the request.
  3. Call release on the connection, specifying the number of expected responses (usually 0 or 1).
  4. After all responses have been received, call responseReceived.
The calls to release and response recevied MUST be handled in the Grizzly client, because Grizzly knows nothing about the protocol details. Failure to call these method properly will either result in connections accumulating in the cache, or premature release of connections.

In addition, it is possible for multiple threads in the client to share the same connection. In this case, it is the client's responsibility to make sure that two clients do not attempt to simultaneously call write. However, the release and responseReceived methods are guaranteed to be thread safe.

<< already prevent this? This will be an issue for CORBA integration of the connection handler>>>

<< and responseReceived method calls>>>

<< details of the connection cache configuration>>>

The Outbound Connection Cache

The outbound connection cache is similar but somewhat simpler, because server-side connection are passively accepted, rather than being created at the client's request. This results in a slightly different API, still with 3 methods:
  1. requestReceived( Connection conn )
  2. requestProcessed( Connection conn, int numResponseExpected )
  3. responseSent( Connection conn )
When a request is received, the requestReceived method must be called to inform the cache about the connection (which may or may not already be in the cache).

<<exactly how this gets called from the Grizzly user's perspective>>>

Once the user's code finishes reading the message from the connection, the user needs to call requestProcessed, indicating the number of responses we expect to send (again, usually 0 or 1). Once the user's code finishes sending the responses, it must call responseSent.

<<CacheableSelectionKeyHandler.postProcess. Is that actually correct? In the CORBA case, we don't want to call requestProcessed until we have read all of the message fragments, and we can't call responseSent until we have asynchronously processed the request and sent the response, so I can't see how this can be called from postProcess.>>>

<<We may be able to eliminate one method here.>>>

Tutorial Introduction


This tutorial will show step by step how to build a very simple "messaging ORB" using the Grizzly framework on the server side and the general NIO on the client side. The underlying design is briefly discussed here first: 

Server side

  1. The server object invocation is purely asynchronous, in that requests are sent to servers asynchronously.   
  2. Responses are sent back to clients asynchronously too via a client listener end point, whose address comes with the original request.  
  3. We call this "messaging" ORB (morb) to emphasize the asynchronous nature of the resulting programming model, as opposed to the underlying wire protocol, which one way or the other uses the same Grizzly, e.g. 
  4. This is not a real ORB of couse. To minimize the distraction (as a Grizzly tutorial), remote object invocation semantics have been largely ignored.  


Client side

  1. Clients send requests asynchronously and registers with its local runtime a callback handler for "future" responses.
  2. Typical ORB plays dual role of server and client. In this tutorial, however, we assume that the client runtime only generates requests (hence a pure client).
  3. Raw NIO is used to build the simple client runtime to interact with the server ORB instance. NIO tricks/guidelines (ref. Grizzly document) are followed. In some way, the client implementation tries to show the "troubles" for not using a NIO framework or what may be under the hood of an NIO framework.

Server-side Implementation


In the following, we will describe how the server side function (morb) is implemented. The server uses Grizzly framework for both receiving request messages (as server) and sending response messages (as client). As a result, both Grizzly server api and client api are used, as well as their respective connection caching mechanisms. 

  1. First, we have the following interface defined to represent a server object instance that a client can invoked. As pointed out earlier, the API is made "type-less" for simplification.  
public interface RequestHandler  { ... }
String handle(String request) throws Exception;
        


  1. The actual server implementation is provided by the following class.  Exception handling is much simplfied, unless it's needed for resource reclaim or error recovery. 
public class ServerRequestController  { ... }
public ServerRequestController(String serverId) { ... }      // serverId is an app. defined identifier for tracing purpose only

public String getServerId() { ... }

public InetSocketAddress getLocalAddr() { ... }              // the local address that the controller (i.e. morb instance) binds to

public void register(String objectId, RequestHandler handler) { ... }    // an server object instance: objectId essentially provides "naming"

public void unregister(String objectId) { ... }    

public void open(String addr) throws Exception { ... }    

public void close() { ... }
        


Now we will describe the initialization open() and shutdown close() routines.  The code is annotated as following:

public void open(String addr) throws Exception { ... }
        this.localAddr = resolveAddressParam(addr);                // defined with the Utils class

        this.grizzlyController = new Controller();                       //  the Grizzly controller, for both sending and receiving

        ConnectorHandlerPool cacheableHandlerPool = new CacheableConnectorHandlerPool(                // enable connection caching for outbound messages (sender)
                grizzlyController, 5, 2, 2);
        grizzlyController.setConnectorHandlerPool(cacheableHandlerPool);

        SelectionKeyHandler cacheableKeyHandler = new CacheableSelectionKeyHandler(                     // enable connection caching for inbound messages (receiver)
                5, 2);
        grizzlyController.setSelectionKeyHandler(cacheableKeyHandler);

        TCPSelectorHandler tcpHandler = new TCPSelectorHandler();                                                   // typical setup for a Grizzly TCP endpoint
        final MessageDecoder filter = new MessageDecoder();

        tcpHandler.setInet(localAddr.getAddress());
        tcpHandler.setPort(localAddr.getPort());

        grizzlyController
                .setProtocolChainInstanceHandler(new DefaultProtocolChainInstanceHandler() {
                    public ProtocolChain poll() {
                        ProtocolChain protocolChain = protocolChains.poll();

                        if (protocolChain == null) {
                            protocolChain = new DefaultProtocolChain();
                            protocolChain.addFilter(new ReadFilter());
                            protocolChain.addFilter(filter);
                        }

                        return protocolChain;
                    }
                });
        grizzlyController.addSelectorHandler(tcpHandler);
        grizzlyController.start();                                                                                                          // Grizzly runs from here ... 


  1. As you can see from the above code, it is mostly transparent to enable connection caching on Grizzly.  The above code refers to MessageDecoder, which defines the protocol filter that does the actual processing of the incoming request messages (TCP). This filter class is defined as an inner class as the following:

private class MessageDecoder implements ProtocolFilter { ... }
        public boolean execute(Context context) throws IOException {
            final WorkerThread workerThread = ((WorkerThread) Thread
                    .currentThread());
            ByteBuffer buffer = workerThread.getByteBuffer();
            buffer.flip();

            if (buffer.hasRemaining()) {
                handleRequest(readBuffer(buffer), context);
            }
            buffer.clear();
            return false;
        }

        public boolean postExecute(Context context) throws IOException {
            return true;
        }

        private void handleRequest(String msg, Context context)
                throws IOException {

            String msgBuffer = (String) context.getAttribute("msgBuffer");                              // earlier partial content
            if (msgBuffer == null) {
                msgBuffer = "";
            }

            msgBuffer = msgBuffer + msg;

            if (msgBuffer.indexOf(PAYLOAD_DELIMITER) != -1) {
                String[] messages = msgBuffer.split(PAYLOAD_DELIMITER);

                for (int i = 0; i < messages.length - 1; i++) {
                    handleAtomicRequest(messages[i]);                                                           //  a complete message body is received
                }

                String lastMessage = messages[messages.length - 1];
                if (msgBuffer.endsWith(PAYLOAD_DELIMITER)) {
                    handleAtomicRequest(lastMessage);
                    msgBuffer = "";
                } else {
                    msgBuffer = lastMessage;
                }
            }

            context.setAttribute("msgBuffer", msgBuffer);                                                       // save the partial message for subsequent processing
        }
       
        private void handleAtomicRequest(String msg) {                                                       //  message format ::=

            try {                                                                                                                  //  <client id - inet address>"\n"
                int pos = msg.indexOf("\n");                                                                            //  <serverObjectId - app. defined>"\n"
                String clientId = msg.substring(0, pos);                                                            //  <message content - representing parameters><payload delimeter>

                String content = msg.substring(pos + 1);                                                          //  <payload delimeter> ::= "###\n"
                pos = content.indexOf("\n");
                String serverObjectId = content.substring(0, pos);
                String msgBody = content.substring(pos + 1);

                RequestHandler handler = handlers.get(serverObjectId);
                if (handler != null) {
                    String rsp = handler.handle(msgBody);
                    sendResponse(clientId, rsp);
                }
            } catch (Exception ex) {
                trace(ex);
            }
        }

        private void sendResponse(String clientId, String msg) { ... }


The message format is quite dummy, but it has the essential parts of client-server invocation semantics. The receiver thread (as provided from Grizzly) will be also responsible for sending out the response to the wire (asynchronously). ARP could also be used here, to make it fancier. 

Overall, processing the receiving message is straightforward (with the use of Grizzly) . 

The following code shows the detail of sendResponse. The more obvious use of Grizzly connection caching actually happens here. As you can see from the code, under the predefined cache pool limits, an outbound connection (to a remote client) will be acquired from the connection cache pool directly (if available). This saves the cost of generating a new connection, which involves expensive native socket operation.


private void sendResponse(String clientId, String msg)  { ... }
            String rspMsg = serverId + "\n" + msg + PAYLOAD_DELIMITER;                //  the response message format

            InetSocketAddress clientIdAddr = resolveAddress(clientId);

           ConnectorHandler clientConnector = grizzlyController                                 // Initiates new connection or reuses one from cache
                    .acquireConnectorHandler(Controller.Protocol.TCP);

            try {
                while (true) {                                                                                     // intermittent - expected ??
                    try {
                        clientConnector
                                .connect(clientIdAddr, new BlockingCallbackHandler(
                                        clientConnector, rspMsg));
                        break;
                    } catch (AlreadyConnectedException ex) {
                        trace("partial failure: already connected");
                    }
                }

                try {                                                                                                    // If limit is not reached - connection will be put back to the cache
                    clientConnector.close();
                } finally {
                    grizzlyController.releaseConnectorHandler(clientConnector);
                }

            } catch (IOException ex) {
                trace(ex);
            }
        }



The actual handling of sending the response is again asynchronous, via the callback API (Grizzly).

private static class BlockingCallbackHandler implements CallbackHandler<Context> { ... }
        private ConnectorHandler connector;

        private String msg;

        public BlockingCallbackHandler(ConnectorHandler connector, String msg) {
            this.connector = connector;
            this.msg = msg;
        }

        public void onConnect(IOEvent<Context> ioEvent) {

            SelectionKey key = ioEvent.attachment().getSelectionKey();
            connector.finishConnect(key);

            connector.getController().registerKey(key, SelectionKey.OP_WRITE,
                    Controller.Protocol.TCP);
        }

        public void onRead(IOEvent<Context> ioEvent) {
        }

        public void onWrite(IOEvent<Context> ioEvent) {
            try {
                connector.write(ByteBuffer.wrap(msg.getBytes()), true);                                      // a blocking write that uses the Grizzly "sender" API
            } catch (IOException ex) {
                trace(ex);
            }
        }


Client-side Implementation


Due to the space limitation, we will not show the client code here. In the following, the client-side design is briefly described. It is of course possible to implement the client runtime (interacting with a remote MORB) with Grizzly too, or any other implementation as long as it aligns with the server morb protocol. 

The provided client implementation uses the basic NIO and adopts the following guidelines as recommended in the Grizzly document:

There's a single channel for sending request messages to a specific remote server, via RequestChannel.  Client application runtime may choose to cache this (as in this tutorial example) to facilitate the server side inbound connection caching.

The client runtime is supposed to instantiate a singleton response processor, via ResponseDispatchChannel. It binds to a pre-specified port, which will be included with each outbound client request message, along with the client inet address. 

Client application registers with the client runtime (that interacts with remote MORBs) a per-server-id hanlder: ResponseHandler. The actual implementation of ResponseHandler decides how the application-level API may be exposed. For now, it's just a dummy callback.

Putting things together


In this section, we will go through the server and client setup, and describe what's expected from the example testing program, and how to observe the connection caching behavior.

  1. On the server side, an MORB is instantiated as the following:  
public class MessagingORB { ... }
    private ServerRequestController serverController;

    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            trace("Usage: MessagingORB <local ip>:<local port>");
            return;
        }

        MessagingORB morb = new MessagingORB();

        morb.init(args[0]);
        morb.deploy();  
        morb.open(args[0]);
        morb.close();  
    }

    public void init(String localAddr) throws Exception {
        serverController = new ServerRequestController("server-" + localAddr);
    }

    public void open(String addr) throws Exception {
        serverController.open(addr);
    }

    public void close() {
        serverController.close();
    }

    public void deploy() {                                                                                     // a simple server object instance - named as "server", which echoes each request
        serverController.register("server", new RequestHandler() {
            public String handle(String request) throws Exception {
                trace("invoke " + request);
                return "request handled: " + request;
            }
        });
    }

  1. On the client side, a standalone client app. is created for the demo purpose. It uses the client runtime support we introduced earlier. The client code sequentially invokes a list of server objects, asynchronously. A single dispatch thread manages the inbound connections (responses). The main application routine, under the user control, loops through the list of server ids (input parameter) and sends each one a request containing an auto-generated sequence id.  In dealing with the server-side inbound connection cache, a failed outbound connection (request) will be retried once if the cached one fails the first time. Upon successful sending a request, the outbound connection will be cached via a simple per-server-id hashmap.
public class SimpleMORBClientApp { ... }
    private ResponseDispatchChannel in;
    
    private Map<InetSocketAddress, RequestChannel> requestChannelCache = new HashMap<InetSocketAddress, RequestChannel>();

    private List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();

    private Thread dispatchThread;

    public static void main(String[] args) throws Exception {

        if (args.length < 2) {
            trace("Usage: SimpleClientApp <local ip>:<local port> <server ip>:<server port> [<server ip>:<server port>]");
            return;
        }

        SimpleMORBClientApp app = new SimpleMORBClientApp();

        app.init(args[0]);

        for (int i = 1; i < args.length; i++) {
            app.register(resolveAddressParam(args[i]));
        }

        app.start();       
        app.doSomething();
        app.end();
    }

    public void init(String localAddr) throws Exception {
        in = new ResponseDispatchChannel(resolveAddressParam(localAddr));
        in.open();
    }

    public void register(final InetSocketAddress serverId) throws Exception {                               // a simple client response handler that prints out the request text
        servers.add(serverId);
        in.register(serverId, new ResponseHandler() {
            public void handle(String response) throws Exception {
                trace("response from " + serverId + ":" + response);
            }
        });
    }

    public void start() {

        dispatchThread = new Thread(new Runnable() {
            public void run() {
                try {
                    in.dispatch();
                } catch (InterruptedException ex) {
                    trace("dispatch thread interrupted");
                } catch (Exception ex) {
                    trace(ex);
                }
            }
        });

        dispatchThread.start();
    }

    public void end() {

        dispatchThread.interrupt();
        try {
            dispatchThread.join();
        } catch (InterruptedException ex) {
        }
        
        for (RequestChannel channel : requestChannelCache.values()) {
            channel.close();
        }

        in.close();
    }

    public void doSomething() {

        RequestChannel out;
        int i = 0;
        String msg;
        do {
            for (InetSocketAddress server : servers) {
                
                msg = in.getClientId().toString() + "[" + i++ + "]" + "any message";         // set the String length > 256 to test multiple read/write (buffer=1024b)
                out = requestChannelCache.get(server);
                try {
                    if (out == null) {
                        out = new RequestChannel(server);
                        out.open();
                    }
                    out.sendRequest("server", msg, in.getClientId());
                } catch (Exception ex) {
                    trace(ex);
                    trace("retry: " + server);
                    out.close();
                    out = new RequestChannel(server);
                    
                    try {
                        out.open();
                        out.sendRequest("server", msg, in.getClientId());
                    }
                    catch (Exception exx) {
                        trace(exx);
                        trace("failed: " + server);
                        out.close();
                        continue;
                    }
                }
                requestChannelCache.put(server, out);
            }
        } while (step("type c to continue") == 'c');
    }


With the above example test application, both the inbound connection cache and outbound connection cache of the Grizzly framework are demonstrated. Interested readers may try to adjust the # of client instances and server instances, and use tools like "netstat -nt" to monitor the open connections between clients and servers, and their variations.

Conclusion


I hope you find this tutorial useful in understanding the Grizzly and its connection cache framework. Comments to make the overall tutorial simpler (i.e. shorter) or more complicated (i.e. revealing more Grizzly) are more than welcome!

There are still intermittent issues (AlreadyConnectedException) in sending resonses from the server, for which I am yet to find the root cause. The current  implementation however works as expected even under such (unexpected) failure conditions.




top


>> Complete Source Code for the Tutorial