Hi Eric,
Eric Heaton wrote:
> D.J.,
>
> This is exactly the response I needed! I'll start with the Controller
> from the 'grizzly' framework and implement a custom ProtocolFiler as you
> describe below. I don't need SSL support, so for partial reads, I think
> I'll instead try a custom Pipeline that creates a custom
> WorkerThreadImpl that contains the partially read message header/body.
Let us know the result and if you think your Filter can be donated to
the community, feel free to send it :-)
Thanks!
-- Jeanfrancois
>
> Thanks!
> -Eric
>
> ----- Original Message ----- From: "D. J. Hagberg (Sun)"
> <Dj.Hagberg_at_Sun.COM>
> To: <users_at_grizzly.dev.java.net>
> Sent: Tuesday, June 19, 2007 4:28 PM
> Subject: Re: Implementing custom StreamAlgorithm
>
>
>> Eric Heaton wrote:
>> > I wish to implement a custom StreamAlgorithm whose parse() method can
>> > potentially return false if the incoming request has not been received
>> > completely into the input buffer. (For the curious, I'm trying to
>> > write a custom protocol that handles the Typed Parameter Language
>> > (tpl.sourceforge.net)).
>> >
>> > However, when my StreamAlgorithm's parse() method returns false, the
>> > request is not processed. I tried inspecting the
>> > ContentLengthAlgorithm's parse() method, but found that this does not
>> > seem to work either. I create a simple SelectorThread for the
>> > ContentLengthAlgorithm and POST a request that's greater than the
>> > default buffer size of 8192 bytes. The request is never processed.
>>
>> Eric --
>>
>> I hope to post my design work up on my blog soon
>> (http://blogs.sun.com/djhagberg) but I decided to cut out the
>> StreamAlgorithm pattern in my application (it seemed like it was more
>> tightly coupled to http-type protocols) and went with the lower-level
>> ProtocolFilter API. I have the SSLReadFilter in front of this to
>> decode SSL into cleartext bytes...
>>
>> My Message's have a fixed-length MessageHeader followed by a
>> variable-length byte[] content.
>>
>> There are a few methods here that deal with partial message reads:
>>
>> - prepareParseBuffer prepends any remaining bytes from the last read
>> onto the current ByteBuffer.
>>
>> - checkRemainderHeader checks for a previously-read MessageHeader
>>
>> - clearRemainder does what it says
>>
>> - preserveRemainder preserves any leftover MessageHeader or partially-
>> read bytes.
>>
>> I can't say this code has passed all regression/unit tests yet but it
>> might help.
>>
>> Note it also depends on an SSLSession to act as a Map for partial
>> reads. If you don't have an SSLSession, you may have something else
>> attached to or associated with your SelectionKey that could be used to
>> stow remaining stuff...
>>
>> public class MessageProtocolFilter implements ProtocolFilter {
>> . . . statics, logger, etc. . . .
>> public static final String REMBUF = "REMBUF";
>> public static final Stirng REMHEAD = "REMHEAD";
>>
>> /**
>> * Decode raw cleartext bytes into one or more Message's
>> * if possible, and hand off to dispatcher to do real work.
>> */
>> public boolean execute(Context ctx) {
>> WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
>> currentThread();
>> SSLSession session = workerThread.getSSLEngine().getSession();
>> ByteBuffer buf = prepareParseBuffer(workerThread, session);
>>
>> // Keep parsing while there is a possibility of decoding msgs
>> int remain;
>> boolean hadError = false;
>> Message m = null;
>> MessageHeader mh = checkRemainderHeader(session);
>> while( (remain=buf.limit()-buf.position()) > 0 ) {
>>
>> // Have enough bytes to read a new header?
>> if( mh == null && remain < MessageHeader.SIZE ) {
>> break;
>>
>> // Read a new header if we need it
>> } else if( tmh == null ) {
>> mh = MessageHeader.readHeader(buf);
>> // check if un-parseable header
>> if( !isValidHeader(mh) ) {
>> log.severe("Invalid msg header: "+mh);
>> hadError = true;
>> break;
>> }
>> }
>>
>> // check if remainder is too small for message contents
>> remain = buf.limit()-buf.position();
>> if( remain < mh.getContentLength() ) {
>> break;
>> }
>>
>> // build message with header content bytes, dispatch msg
>> byte[] body = new byte[mh.getContentLength()];
>> buf.get(body);
>> m = new Message(mh, body);
>> if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
>> hadError = true;
>> break;
>> }
>>
>> m = null;
>> mh = null;
>> }
>>
>> // If we had an error, need to shut down socket connection
>> if( hadError ) {
>> ctx.setKeyRegistrationState(Context.
>> KeyRegistrationState.CANCEL);
>> clearRemainder(session);
>> } else {
>> ctx.setKeyRegistrationState(Context.
>> KeyRegistrationState.REGISTER);
>> preserveRemainder(session, buf, mh);
>> }
>>
>> // Ensure the "working" buffer is cleared before returning
>> workerThread.getByteBuffer().clear();
>>
>> return !hadError;
>> }
>>
>> public boolean postExecute(Context ctx){
>> return true;
>> }
>>
>> /**
>> * Check if there are any remaining bytes from a previous read
>> * that just had a partial message header or message content.
>> */
>> private ByteBuffer prepareParseBuffer(WorkerThreadImpl workerThread,
>> SSLSession session) {
>> ByteBuffer bb = workerThread.getByteBuffer();
>> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>> if( rem != null && rem.position() > 0 ) {
>> // concatenate remainder with new bytes to be parsed
>> bb.flip();
>> int cnt = bb.limit() - bb.position();
>> int space = rem.limit() - rem.position();
>> // Make remainder buffer bigger if necessary
>> if( space < cnt ) {
>> ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
>> cnt + 512);
>> rem.flip();
>> rem2.put(rem);
>> rem.clear();
>> rem = rem2;
>> session.putValue(REMBUF, rem);
>> }
>> rem.put(bb);
>> bb.clear();
>> bb = rem;
>> }
>> bb.flip(); // prepare for read
>> return bb;
>> }
>>
>> /**
>> * Check if last time we were able to decode a header but not
>> * able to read the full message contents.
>> */
>> private MessageHeader checkRemainderHeader(SSLSession session) {
>> MessageHeader mh;
>> mh = (MessageHeader)session.getValue(REMHEAD);
>> if( mh != null ) {
>> session.removeValue(REMHEAD);
>> }
>> return mh;
>> }
>>
>> /**
>> * Remove any "remainder" objects from the SSL session,
>> * in preparation for shutting down the connection.
>> */
>> private void clearRemainder(SSLSession session) {
>> session.removeValue(REMHEAD);
>> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>> if( rem != null ) {
>> rem.clear();
>> session.removeValue(REMBUF);
>> }
>> }
>>
>> /**
>> * Preserve any partially-read bytes or header and partial content.
>> */
>> private void preserveRemainder(SSLSession session, ByteBuffer bb,
>> MessageHeader mh) {
>> if( mh == null ) {
>> session.removeValue(REMHEAD);
>> } else {
>> session.putValue(REMHEAD, mh);
>> }
>> int remainCnt = bb.limit()-bb.position();
>> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>> if( rem != null && rem == bb ) {
>> bb.compact(); // automatically prepares for next append
>> } else if( rem != null && remainCnt == 0 ) {
>> rem.clear();
>> } else if( rem != null ) {
>> if( rem.limit() != rem.capacity() ) {
>> rem.flip(); // prepare for appending
>> }
>> if( remainCnt > (rem.limit()-rem.position()) ) {
>> ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
>> remainCnt + 1024);
>> rem.flip(); // switch to read mode
>> rem2.put(rem);
>> rem.clear();
>> rem = rem2;
>> }
>> rem.put(bb);
>> } else if( remainCnt > 0 ) {
>> int alloc = 16 * 1024;
>> if( alloc < remainCnt ) {
>> alloc = remainCnt + 2*1024;
>> }
>> rem = ByteBuffer.allocate(alloc);
>> rem.put(bb);
>> }
>> if( rem != null ) {
>> session.putValue(REMBUF, rem);
>> }
>> }
>>
>> . . . More housekeeping methods, logging, etc ...
>> }
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>