users@grizzly.java.net

Re: Implementing custom StreamAlgorithm

From: Eric Heaton <eheaton_at_iii.com>
Date: Wed, 20 Jun 2007 08:32:26 -0700

D.J.,

This is exactly the response I needed! I'll start with the Controller from
the 'grizzly' framework and implement a custom ProtocolFiler as you describe
below. I don't need SSL support, so for partial reads, I think I'll instead
try a custom Pipeline that creates a custom WorkerThreadImpl that contains
the partially read message header/body.

Thanks!
-Eric

----- Original Message -----
From: "D. J. Hagberg (Sun)" <Dj.Hagberg_at_Sun.COM>
To: <users_at_grizzly.dev.java.net>
Sent: Tuesday, June 19, 2007 4:28 PM
Subject: Re: Implementing custom StreamAlgorithm


> Eric Heaton wrote:
> > I wish to implement a custom StreamAlgorithm whose parse() method can
> > potentially return false if the incoming request has not been received
> > completely into the input buffer. (For the curious, I'm trying to
> > write a custom protocol that handles the Typed Parameter Language
> > (tpl.sourceforge.net)).
> >
> > However, when my StreamAlgorithm's parse() method returns false, the
> > request is not processed. I tried inspecting the
> > ContentLengthAlgorithm's parse() method, but found that this does not
> > seem to work either. I create a simple SelectorThread for the
> > ContentLengthAlgorithm and POST a request that's greater than the
> > default buffer size of 8192 bytes. The request is never processed.
>
> Eric --
>
> I hope to post my design work up on my blog soon
> (http://blogs.sun.com/djhagberg) but I decided to cut out the
> StreamAlgorithm pattern in my application (it seemed like it was more
> tightly coupled to http-type protocols) and went with the lower-level
> ProtocolFilter API. I have the SSLReadFilter in front of this to decode
> SSL into cleartext bytes...
>
> My Message's have a fixed-length MessageHeader followed by a
> variable-length byte[] content.
>
> There are a few methods here that deal with partial message reads:
>
> - prepareParseBuffer prepends any remaining bytes from the last read
> onto the current ByteBuffer.
>
> - checkRemainderHeader checks for a previously-read MessageHeader
>
> - clearRemainder does what it says
>
> - preserveRemainder preserves any leftover MessageHeader or partially-
> read bytes.
>
> I can't say this code has passed all regression/unit tests yet but it
> might help.
>
> Note it also depends on an SSLSession to act as a Map for partial reads.
> If you don't have an SSLSession, you may have something else attached to
> or associated with your SelectionKey that could be used to stow remaining
> stuff...
>
> public class MessageProtocolFilter implements ProtocolFilter {
> . . . statics, logger, etc. . . .
> public static final String REMBUF = "REMBUF";
> public static final Stirng REMHEAD = "REMHEAD";
>
> /**
> * Decode raw cleartext bytes into one or more Message's
> * if possible, and hand off to dispatcher to do real work.
> */
> public boolean execute(Context ctx) {
> WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
> currentThread();
> SSLSession session = workerThread.getSSLEngine().getSession();
> ByteBuffer buf = prepareParseBuffer(workerThread, session);
>
> // Keep parsing while there is a possibility of decoding msgs
> int remain;
> boolean hadError = false;
> Message m = null;
> MessageHeader mh = checkRemainderHeader(session);
> while( (remain=buf.limit()-buf.position()) > 0 ) {
>
> // Have enough bytes to read a new header?
> if( mh == null && remain < MessageHeader.SIZE ) {
> break;
>
> // Read a new header if we need it
> } else if( tmh == null ) {
> mh = MessageHeader.readHeader(buf);
> // check if un-parseable header
> if( !isValidHeader(mh) ) {
> log.severe("Invalid msg header: "+mh);
> hadError = true;
> break;
> }
> }
>
> // check if remainder is too small for message contents
> remain = buf.limit()-buf.position();
> if( remain < mh.getContentLength() ) {
> break;
> }
>
> // build message with header content bytes, dispatch msg
> byte[] body = new byte[mh.getContentLength()];
> buf.get(body);
> m = new Message(mh, body);
> if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
> hadError = true;
> break;
> }
>
> m = null;
> mh = null;
> }
>
> // If we had an error, need to shut down socket connection
> if( hadError ) {
> ctx.setKeyRegistrationState(Context.
> KeyRegistrationState.CANCEL);
> clearRemainder(session);
> } else {
> ctx.setKeyRegistrationState(Context.
> KeyRegistrationState.REGISTER);
> preserveRemainder(session, buf, mh);
> }
>
> // Ensure the "working" buffer is cleared before returning
> workerThread.getByteBuffer().clear();
>
> return !hadError;
> }
>
> public boolean postExecute(Context ctx){
> return true;
> }
>
> /**
> * Check if there are any remaining bytes from a previous read
> * that just had a partial message header or message content.
> */
> private ByteBuffer prepareParseBuffer(WorkerThreadImpl workerThread,
> SSLSession session) {
> ByteBuffer bb = workerThread.getByteBuffer();
> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
> if( rem != null && rem.position() > 0 ) {
> // concatenate remainder with new bytes to be parsed
> bb.flip();
> int cnt = bb.limit() - bb.position();
> int space = rem.limit() - rem.position();
> // Make remainder buffer bigger if necessary
> if( space < cnt ) {
> ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
> cnt + 512);
> rem.flip();
> rem2.put(rem);
> rem.clear();
> rem = rem2;
> session.putValue(REMBUF, rem);
> }
> rem.put(bb);
> bb.clear();
> bb = rem;
> }
> bb.flip(); // prepare for read
> return bb;
> }
>
> /**
> * Check if last time we were able to decode a header but not
> * able to read the full message contents.
> */
> private MessageHeader checkRemainderHeader(SSLSession session) {
> MessageHeader mh;
> mh = (MessageHeader)session.getValue(REMHEAD);
> if( mh != null ) {
> session.removeValue(REMHEAD);
> }
> return mh;
> }
>
> /**
> * Remove any "remainder" objects from the SSL session,
> * in preparation for shutting down the connection.
> */
> private void clearRemainder(SSLSession session) {
> session.removeValue(REMHEAD);
> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
> if( rem != null ) {
> rem.clear();
> session.removeValue(REMBUF);
> }
> }
>
> /**
> * Preserve any partially-read bytes or header and partial content.
> */
> private void preserveRemainder(SSLSession session, ByteBuffer bb,
> MessageHeader mh) {
> if( mh == null ) {
> session.removeValue(REMHEAD);
> } else {
> session.putValue(REMHEAD, mh);
> }
> int remainCnt = bb.limit()-bb.position();
> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
> if( rem != null && rem == bb ) {
> bb.compact(); // automatically prepares for next append
> } else if( rem != null && remainCnt == 0 ) {
> rem.clear();
> } else if( rem != null ) {
> if( rem.limit() != rem.capacity() ) {
> rem.flip(); // prepare for appending
> }
> if( remainCnt > (rem.limit()-rem.position()) ) {
> ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
> remainCnt + 1024);
> rem.flip(); // switch to read mode
> rem2.put(rem);
> rem.clear();
> rem = rem2;
> }
> rem.put(bb);
> } else if( remainCnt > 0 ) {
> int alloc = 16 * 1024;
> if( alloc < remainCnt ) {
> alloc = remainCnt + 2*1024;
> }
> rem = ByteBuffer.allocate(alloc);
> rem.put(bb);
> }
> if( rem != null ) {
> session.putValue(REMBUF, rem);
> }
> }
>
> . . . More housekeeping methods, logging, etc ...
> }
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
>