users@grizzly.java.net

Re: state machine in grizzly

From: Oleksiy Stashok <oleksiy.stashok_at_oracle.com>
Date: Thu, 30 Jun 2011 11:55:17 +0200

Hi Jon,
>
> How would I go about building a state machine in Grizzly that
> could handle incremental processing? Assuming I'm processing
> HTTP requests, I can create some object and extract some
> information when the request starts. Then I don't want to have
> to do all that work again every time handleRead gets called
> with a new chunk of content.
>
> Where do I save my stateful objects so that subsequent chunks
> of a request can be accumulated until the HttpContent.isLast()
> is seen?
>
> You can try HttpHeader.getAttributes() (which might be either
> HttpRequestPacket or HttpResponsePacket), which will return
> AttributeHolder for this HttpHeader.
>
> AttributeHolder might be use to store custom attributes.
>
> For example create static final Attribute:
> private static final Attribute<MyType> MY_ATTR =
>
> Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(MY_ATTRIBUTE_NAME);
>
> and then get/set attribute on the HttpHeader:
>
> MY_ATTR.set(httpHeader, value);
>
> or
>
> MyType value = MY_ATTR.get(httpHeader);
>
> Hope this will help.
>
> This should work.
>
> This segregates the data based on request, right? So if I put an
> attribute onto the request, any other method call that gets this
> request will have access to that same object, right?
Yes.
Please note this method (AttributeHolder HttpHeader.getAttributes()),
has nothing to do with HttpRequestPacket.get/setAttribute(), those
storages are independent (naming issue on our side).
HttpHeader.getAttributes() is general and might be used by all
HttpHeader subtypes, including HttpRequest.

Though both storages might be used to store HttpHeader/HttpRequest
associated attributes and both have the same life-cycle.

Coming back to the test failure, pls check the diff bellow to fix the
RpbFilter code.

Thanks.

WBR,
Alexey.


diff --git a/src/main/java/com/jbrisbin/riak/pbc/RpbFilter.java
b/src/main/java/com/jbrisbin/riak/pbc/RpbFilter.java
index 00df644..a493c70 100644
--- a/src/main/java/com/jbrisbin/riak/pbc/RpbFilter.java
+++ b/src/main/java/com/jbrisbin/riak/pbc/RpbFilter.java
@@ -14,7 +14,7 @@

  package com.jbrisbin.riak.pbc;

-import static com.basho.riak.pbc.RiakMessageCodes.*;
+import static com.jbrisbin.riak.pbc.RiakMessageCodes.*;

  import java.io.DataOutputStream;
  import java.io.IOException;
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
   * @author Jon Brisbin <jon_at_jbrisbin.com>
   */
  public class RpbFilter extends BaseFilter {
+ private final static int HEADER_SIZE = 4;

         private Logger log = LoggerFactory.getLogger(getClass());
         private HeapMemoryManager heap;
@@ -48,14 +49,26 @@ public class RpbFilter extends BaseFilter {

         @Override public NextAction handleRead(FilterChainContext ctx)
throws IOException {
                 Buffer buffer = ctx.getMessage();
- int size = buffer.getInt();
- int code = buffer.get();
- if (buffer.remaining() < (size - 1)) {
- buffer.rewind();
+ if (buffer.remaining() < HEADER_SIZE) {
+ if (log.isDebugEnabled())
+ log.debug("Not enough data to read header:
" + buffer.remaining());
+ return ctx.getStopAction(buffer);
+ }
+
+ int pos = buffer.position();
+
+ int size = buffer.getInt();
+ if (buffer.remaining() < size) {
+ buffer.position(pos);
                         if (log.isDebugEnabled())
                                 log.debug("Not enough data yet: " +
buffer.remaining() + " (need " + size + ")");
                         return ctx.getStopAction(buffer);
                 }
+
+ Buffer remainder = (buffer.remaining() > size) ?
+ buffer.split(pos + HEADER_SIZE + size) : null;
+
+ int code = buffer.get();

                 BufferInputStream bin = new BufferInputStream(buffer);
                 switch (code) {
@@ -105,7 +118,7 @@ public class RpbFilter extends BaseFilter {
                 log.debug("read buffer on thread " +
Thread.currentThread().getName());
                 log.debug(String.format("req=%s, resp=%s",
requests.get(), responses.incrementAndGet()));

- return ctx.getInvokeAction();
+ return ctx.getInvokeAction(remainder);
         }

         @Override public NextAction handleWrite(FilterChainContext ctx)
throws IOException {


>
>
> Thanks!
>
> Jon Brisbin
> http//jbrisbin.com
>
>
>
> Thanks.
> WBR,
> Alexey.
>
> PS: sorry don't have time to check the failing test today, will do
> that asap.
>
>
> I need a state machine, basically, but it's not immediately
> apparent to me how to do that in Grizzly...
>
>
> Thanks!
>
> Jon Brisbin
> http//jbrisbin.com
>
>
>
>