users@grizzly.java.net

Re: ProtocolParser Advice

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Mon, 09 Jun 2008 11:17:55 -0400

Salut,

it took a while, but I've added you class under the samples directory:

https://grizzly.dev.java.net/issues/show_bug.cgi?id=157

Next steps is to add documentation (and blogs if you can :-)) and
evaluate if we want to add it to:

+ the core classes

or

+ a new module called grizzly-extra

What peoples's think?

A+

-- Jeanfrancois

John ROM wrote:
> Hi,
> just found some bugs in my parser.
> here is a better version:
>
> package util;
>
> import com.sun.grizzly.ProtocolParser;
> import com.sun.grizzly.util.ByteBufferFactory;
> import com.sun.grizzly.util.WorkerThread;
>
> import java.nio.ByteBuffer;
>
>
>
> /**
> * ProtocolParser Filter which parses a Custom Protocol
> * into a MessageParser. These IncomingMessages can then be used
> * by other Protocolchain Filters
> *
> * So basicly implement abstract method
> * "public boolean parseHeader(ByteBuffer bb)" of
> * IncommingMessage and retrieve these message up by another above Filter in chain.
> *
> * The Messages can be arbitrary long.
> * The Protocol has the following format
> * (n params + Data Size + Data):
>
> * n bytes : n number of header params
> * ----------------------------------------------
> * 4 bytes Data length - The length of Data
> * n bytes Data - Message
> *
>
> * The Protocol asumes that on average Messages are not too large (Gigs) otherwise
> * this implementation might not be effective.
>
> * Tries to avoid bytearray copying and therfore maps the IncomingMessageeee onto
> * the orginal Grizzly Bytebuffer. If Messages do not fit into an default sized Grizzly
> * ByteBuffer a larger one with enough capacity is temporarly given to the framework.
> *
> * This class is still under construction and for example Error Handling,
> * Timeouts, Recovery has to be greatly improved!!!
> * John Vieten
> */
>
>
> public abstract class MyProtocolParser implements ProtocolParser<MyProtocolParser.MessageParser> {
> private boolean startBufferExecuted = false;
> private boolean modeWithoutReadFilter;
>
>
> protected MyProtocolParser(boolean modeWithoutReadFilter) {
> this.state = modeWithoutReadFilter ? State.HAS_MORE_BYTES_TO_PARSE : State.START;
> this.modeWithoutReadFilter = modeWithoutReadFilter;
> }
>
> public static final int DEFAULT_BUFFER_SIZE = 8192;
> /**
> * Handle to byteBuffer which gets filled in by grizzly
> */
> private ByteBuffer grizzlyBuffer;
> /**
> * If Message are larger than DEFAULT_BUFFER_SIZE a new larger Grizzly ByteBuffer is created
> * This handle is used to give Grizzly back its original default ByteBuffer .
> */
> private ByteBuffer restoreHandle;
>
> /**
> * The length of the data depicted by Protocol Header
> */
> private int dataLength = 0;
>
> /**
> * Just dataLength + Header Length
> */
> private int messageLength = 0;
> /*
> Keeps track of the bytes read by the Grizzly ReadFilter
> */
> private int trackBytesRead = 0;
>
>
> /**
> * Keeps track of the start position of the current message in the Grizzly ByteBuffer.
> * Needed because an Grizzly ByteBuffer can contain several Messages at once
> * The messageStart always reflects the start Boundary of the current Message
> * And is either zero or set when a message was fully parsed and following one is still
> * in the buffer
> */
> int messageStart = 0;
>
> /**
> * Holds the state of this Statemachine
> */
> private State state = null;
>
> /**
> * The Message given to other Filters up the chain
> */
>
> private MessageParser messageParser;
>
> public abstract MessageParser newInstance();
>
> public static enum State {
> START,
> EXPECTING_MORE_HEADER_BYTES,
> EXPECTING_MORE_DATA_BYTES,
> HAS_MORE_BYTES_TO_PARSE,
> MEASSAGE_PARSED,
> MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE,
> }
>
>
> public boolean isExpectingMoreData() {
> boolean result = state == State.EXPECTING_MORE_DATA_BYTES ||
> state == State.EXPECTING_MORE_HEADER_BYTES;
>
> return result;
>
> }
>
>
> public boolean hasMoreBytesToParse() {
> return state == State.HAS_MORE_BYTES_TO_PARSE;
> }
>
> public MessageParser getNextMessage() {
>
> switch (state) {
> case MEASSAGE_PARSED:
> resetState();
> break;
> case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
> state = State.HAS_MORE_BYTES_TO_PARSE;
>
> }
> MessageParser tmp = messageParser;
> messageParser.unattach();
> messageParser = null;
> return tmp;
> }
>
>
> public boolean hasNextMessage() {
> if (!startBufferExecuted) {
> startBuffer(((WorkerThread) Thread.currentThread()).getByteBuffer());
> }
> parseMessage();
> switch (state) {
> case MEASSAGE_PARSED:
> case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
> return true;
> default:
> return false;
> }
> }
>
>
> public void parseMessage() {
> switch (state) {
> case MEASSAGE_PARSED:
> case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
> return;
> }
>
>
> switch (state) {
>
> case HAS_MORE_BYTES_TO_PARSE:
>
> case START:
> messageParser = newInstance();
>
> case EXPECTING_MORE_HEADER_BYTES:
> if ((trackBytesRead - messageStart) < messageParser.getHeaderLength()) {
> state = State.EXPECTING_MORE_HEADER_BYTES;
> grizzlyBuffer.position(messageStart);
> // doesn't happen often so we can compact
> grizzlyBuffer.compact();
> trackBytesRead=trackBytesRead-messageStart;
> messageStart=0;
> grizzlyBuffer.position(trackBytesRead);
> return;
> }
> grizzlyBuffer.position(messageStart);
> try {
> // expects a position messageStart
> boolean isKMProtocol = messageParser.parseHeader(grizzlyBuffer);
> if (!isKMProtocol) {
> messageParser.setException(new Exception("Bad Startmark"));
> state = State.MEASSAGE_PARSED;
> return;
>
> }
> } catch (Exception e) {
> // System.out.println(e.getMessage());
> messageParser.setException(e);
> state = State.MEASSAGE_PARSED;
> return;
> }
>
> dataLength = messageParser.getDataSize();
> messageLength = dataLength + MyIncomingMessage.HEADER_LENGTH;
>
> // position should by at data start
> if ((grizzlyBuffer.capacity() - grizzlyBuffer.position()) < dataLength) {
> // Not enough room to store message so we need a bigger buffer and also some extra room for
> // any new message chunks
>
> // maybe I should do a compact to use a smaller buffer here but I am not sure about performance
>
> int newCapacity = messageLength + grizzlyBuffer.position() + DEFAULT_BUFFER_SIZE;
> grizzlyBuffer.position(0);
> ByteBuffer newBuffer = ByteBufferFactory.allocateView(newCapacity, grizzlyBuffer.isDirect());
> newBuffer.put(grizzlyBuffer);
> WorkerThread workerThread = (WorkerThread) Thread.currentThread();
> workerThread.setByteBuffer(grizzlyBuffer = newBuffer);
>
> }
>
> // Now create a sliced copy of GrizzlyBuffer so that
> // we have start and end pointers on our wanted message
> grizzlyBuffer.position(MyIncomingMessage.HEADER_LENGTH + messageStart);
>
>
> grizzlyBuffer.limit(messageLength + messageStart);
>
> ByteBuffer slicedHandle = grizzlyBuffer.slice();
> grizzlyBuffer.limit(grizzlyBuffer.capacity());
> // Now Buffer has enough space for message
>
> messageParser.setByteBuffer(slicedHandle);
>
>
> case EXPECTING_MORE_DATA_BYTES:
> int remaining = trackBytesRead - messageLength - messageStart;
> if (remaining == 0) {
> // Ok we found a message
> state = State.MEASSAGE_PARSED;
>
> } else if (remaining > 0) {
>
> state = State.MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE;
> messageStart = messageStart + messageLength;
>
>
> } else {
> // we have to keep on ready bytes from the net
> state = State.EXPECTING_MORE_DATA_BYTES;
> grizzlyBuffer.position(trackBytesRead);
>
>
>
> }
> }
>
> }
>
> public boolean releaseBuffer() {
> boolean saveCurrentParserState = false;
> if (isExpectingMoreData()) {
> saveCurrentParserState = true;
> } else {
> if (restoreHandle == null) {
>
> Exception e = new Exception();
> e.printStackTrace();
> return false;
> }
> WorkerThread workerThread = (WorkerThread) Thread.currentThread();
> restoreHandle.clear();
>
> workerThread.setByteBuffer(restoreHandle);
> grizzlyBuffer = null;
> restoreHandle = null;
> }
> return saveCurrentParserState;
>
> }
>
>
> public void startBuffer(ByteBuffer bb) {
> restoreHandle = bb;
> // reflects the actual physically read byte by ReadFilter
> trackBytesRead = bb.position();
>
>
> grizzlyBuffer = bb;
> if (!isExpectingMoreData()) {
> grizzlyBuffer.position(0);
> resetState();
> }
> startBufferExecuted = true;
> }
>
>
> private void resetState() {
> state = State.START;
> dataLength = 0;
> messageLength = 0;
> messageStart = 0;
> }
>
> /**
> * Used with ProtocolParser to parse Messages and
> * fill an byteBuffer with the message data.
> */
> public abstract class MessageParser {
>
> private int dataSize;
> private ByteBuffer byteBuffer;
>
> public abstract boolean parseHeader(ByteBuffer bb) throws Exception;
>
> private byte[] bytes;
>
>
>
> final public ByteBuffer getByteBuffer() {
> return byteBuffer;
> }
>
> final public byte[] toByteArray() {
> if(hasException()) return new byte[0];
> return bytes;
> }
>
>
> final public void setByteBuffer(ByteBuffer byteBuffer) {
> // System.out.println("in s"+this.hashCode()+" "+(byteBuffer!=null));
> this.byteBuffer = byteBuffer;
> }
>
> public abstract int getHeaderLength();
>
> final public void setDataSize(int dataSize) {
> this.dataSize = dataSize;
> }
>
> final public int getDataSize() {
> return dataSize;
> }
>
> public byte[] getBytes() {
> return bytes;
> }
>
> public void setBytes(byte[] bytes) {
> this.bytes = bytes;
> }
>
>
>
> private Exception exception;
>
> final public Exception getException() {
> return exception;
> }
>
> final public void setException(Exception exception) {
> this.exception = exception;
>
> }
>
> final public boolean hasException() {
> return exception != null;
> }
>
> final public void unattach() {
>
> //System.out.println("in u"+this.hashCode()+" "+(byteBuffer!=null));
> if(hasException()) return;
>
> bytes = new byte[dataSize];
> byteBuffer.get(bytes);
> byteBuffer.clear();
> byteBuffer=null;
>
> }
>
>
>
> }
> public class MyIncomingMessage extends MessageParser {
> final static int HEADER_LENGTH=12;
> int START_MARK = 0x77335434;
> private boolean statusOk;
>
>
>
> public boolean parseHeader(ByteBuffer bb) throws Exception {
>
>
> int startmark = bb.getInt();
> int status = bb.getInt();
>
> int size = bb.getInt();
> if (startmark !=START_MARK) {
> return false;
>
> }
> statusOk =status== 1;
>
> setDataSize(size);
>
> if (getDataSize() <= 0) throw new Exception("Bad Message Length");
> return true;
>
> }
>
>
>
> public int getHeaderLength() {
> return HEADER_LENGTH;
> }
>
>
> public boolean isStatusOk() {
> return statusOk;
> }
> }
>
> }