users@grizzly.java.net

Re: Implementing custom StreamAlgorithm

From: D. J. Hagberg (Sun) <"D.>
Date: Tue, 19 Jun 2007 17:28:24 -0600

Eric Heaton wrote:
> I wish to implement a custom StreamAlgorithm whose parse() method can
> potentially return false if the incoming request has not been received
> completely into the input buffer. (For the curious, I'm trying to
> write a custom protocol that handles the Typed Parameter Language
> (tpl.sourceforge.net)).
>
> However, when my StreamAlgorithm's parse() method returns false, the
> request is not processed. I tried inspecting the
> ContentLengthAlgorithm's parse() method, but found that this does not
> seem to work either. I create a simple SelectorThread for the
> ContentLengthAlgorithm and POST a request that's greater than the
> default buffer size of 8192 bytes. The request is never processed.

Eric --

I hope to post my design work up on my blog soon
(http://blogs.sun.com/djhagberg) but I decided to cut out the
StreamAlgorithm pattern in my application (it seemed like it was more
tightly coupled to http-type protocols) and went with the lower-level
ProtocolFilter API. I have the SSLReadFilter in front of this to decode
SSL into cleartext bytes...

My Message's have a fixed-length MessageHeader followed by a
variable-length byte[] content.

There are a few methods here that deal with partial message reads:

- prepareParseBuffer prepends any remaining bytes from the last read
   onto the current ByteBuffer.

- checkRemainderHeader checks for a previously-read MessageHeader

- clearRemainder does what it says

- preserveRemainder preserves any leftover MessageHeader or partially-
   read bytes.

I can't say this code has passed all regression/unit tests yet but it
might help.

Note it also depends on an SSLSession to act as a Map for partial reads.
  If you don't have an SSLSession, you may have something else attached
to or associated with your SelectionKey that could be used to stow
remaining stuff...

public class MessageProtocolFilter implements ProtocolFilter {
     . . . statics, logger, etc. . . .
     public static final String REMBUF = "REMBUF";
     public static final Stirng REMHEAD = "REMHEAD";

     /**
      * Decode raw cleartext bytes into one or more Message's
      * if possible, and hand off to dispatcher to do real work.
      */
     public boolean execute(Context ctx) {
         WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
                 currentThread();
         SSLSession session = workerThread.getSSLEngine().getSession();
         ByteBuffer buf = prepareParseBuffer(workerThread, session);

         // Keep parsing while there is a possibility of decoding msgs
         int remain;
         boolean hadError = false;
         Message m = null;
         MessageHeader mh = checkRemainderHeader(session);
         while( (remain=buf.limit()-buf.position()) > 0 ) {

             // Have enough bytes to read a new header?
             if( mh == null && remain < MessageHeader.SIZE ) {
                 break;

             // Read a new header if we need it
             } else if( tmh == null ) {
                 mh = MessageHeader.readHeader(buf);
                 // check if un-parseable header
                 if( !isValidHeader(mh) ) {
                     log.severe("Invalid msg header: "+mh);
                     hadError = true;
                     break;
                 }
             }

             // check if remainder is too small for message contents
             remain = buf.limit()-buf.position();
             if( remain < mh.getContentLength() ) {
                 break;
             }

             // build message with header content bytes, dispatch msg
             byte[] body = new byte[mh.getContentLength()];
             buf.get(body);
             m = new Message(mh, body);
             if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
                 hadError = true;
                 break;
             }

             m = null;
             mh = null;
         }

         // If we had an error, need to shut down socket connection
         if( hadError ) {
             ctx.setKeyRegistrationState(Context.
                     KeyRegistrationState.CANCEL);
             clearRemainder(session);
         } else {
             ctx.setKeyRegistrationState(Context.
                     KeyRegistrationState.REGISTER);
             preserveRemainder(session, buf, mh);
         }

         // Ensure the "working" buffer is cleared before returning
         workerThread.getByteBuffer().clear();

         return !hadError;
     }

     public boolean postExecute(Context ctx){
         return true;
     }

     /**
      * Check if there are any remaining bytes from a previous read
      * that just had a partial message header or message content.
      */
     private ByteBuffer prepareParseBuffer(WorkerThreadImpl workerThread,
             SSLSession session) {
         ByteBuffer bb = workerThread.getByteBuffer();
         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
         if( rem != null && rem.position() > 0 ) {
             // concatenate remainder with new bytes to be parsed
             bb.flip();
             int cnt = bb.limit() - bb.position();
             int space = rem.limit() - rem.position();
             // Make remainder buffer bigger if necessary
             if( space < cnt ) {
                 ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
                         cnt + 512);
                 rem.flip();
                 rem2.put(rem);
                 rem.clear();
                 rem = rem2;
                 session.putValue(REMBUF, rem);
             }
             rem.put(bb);
             bb.clear();
             bb = rem;
         }
         bb.flip(); // prepare for read
         return bb;
     }

     /**
      * Check if last time we were able to decode a header but not
      * able to read the full message contents.
      */
     private MessageHeader checkRemainderHeader(SSLSession session) {
         MessageHeader mh;
         mh = (MessageHeader)session.getValue(REMHEAD);
         if( mh != null ) {
             session.removeValue(REMHEAD);
         }
         return mh;
     }

     /**
      * Remove any "remainder" objects from the SSL session,
      * in preparation for shutting down the connection.
      */
     private void clearRemainder(SSLSession session) {
         session.removeValue(REMHEAD);
         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
         if( rem != null ) {
             rem.clear();
             session.removeValue(REMBUF);
         }
     }

     /**
      * Preserve any partially-read bytes or header and partial content.
      */
     private void preserveRemainder(SSLSession session, ByteBuffer bb,
             MessageHeader mh) {
         if( mh == null ) {
             session.removeValue(REMHEAD);
         } else {
             session.putValue(REMHEAD, mh);
         }
         int remainCnt = bb.limit()-bb.position();
         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
         if( rem != null && rem == bb ) {
             bb.compact(); // automatically prepares for next append
         } else if( rem != null && remainCnt == 0 ) {
             rem.clear();
         } else if( rem != null ) {
             if( rem.limit() != rem.capacity() ) {
                 rem.flip(); // prepare for appending
             }
             if( remainCnt > (rem.limit()-rem.position()) ) {
                 ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
                         remainCnt + 1024);
                 rem.flip(); // switch to read mode
                 rem2.put(rem);
                 rem.clear();
                 rem = rem2;
             }
             rem.put(bb);
         } else if( remainCnt > 0 ) {
             int alloc = 16 * 1024;
             if( alloc < remainCnt ) {
                 alloc = remainCnt + 2*1024;
             }
             rem = ByteBuffer.allocate(alloc);
             rem.put(bb);
         }
         if( rem != null ) {
             session.putValue(REMBUF, rem);
         }
     }

     . . . More housekeeping methods, logging, etc ...
}