users@grizzly.java.net

Re: ProtocolParser Advice

From: John ROM <snake-john_at_gmx.de>
Date: Tue, 03 Jun 2008 00:26:09 +0200

Hi,
just found some bugs in my parser.
here is a better version:

package util;

import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.util.ByteBufferFactory;
import com.sun.grizzly.util.WorkerThread;

import java.nio.ByteBuffer;



/**
 * ProtocolParser Filter which parses a Custom Protocol
 * into a MessageParser. These IncomingMessages can then be used
 * by other Protocolchain Filters
 *
 * So basicly implement abstract method
  * "public boolean parseHeader(ByteBuffer bb)" of
  * IncommingMessage and retrieve these message up by another above Filter in chain.
  *
 * The Messages can be arbitrary long.
 * The Protocol has the following format
 * (n params + Data Size + Data):

 * n bytes : n number of header params
 * ----------------------------------------------
 * 4 bytes Data length - The length of Data
 * n bytes Data - Message
 *

 * The Protocol asumes that on average Messages are not too large (Gigs) otherwise
 * this implementation might not be effective.

 * Tries to avoid bytearray copying and therfore maps the IncomingMessageeee onto
 * the orginal Grizzly Bytebuffer. If Messages do not fit into an default sized Grizzly
 * ByteBuffer a larger one with enough capacity is temporarly given to the framework.
 *
 * This class is still under construction and for example Error Handling,
 * Timeouts, Recovery has to be greatly improved!!!
 * John Vieten
 */


public abstract class MyProtocolParser implements ProtocolParser<MyProtocolParser.MessageParser> {
    private boolean startBufferExecuted = false;
    private boolean modeWithoutReadFilter;


    protected MyProtocolParser(boolean modeWithoutReadFilter) {
        this.state = modeWithoutReadFilter ? State.HAS_MORE_BYTES_TO_PARSE : State.START;
        this.modeWithoutReadFilter = modeWithoutReadFilter;
    }

    public static final int DEFAULT_BUFFER_SIZE = 8192;
    /**
     * Handle to byteBuffer which gets filled in by grizzly
     */
    private ByteBuffer grizzlyBuffer;
    /**
     * If Message are larger than DEFAULT_BUFFER_SIZE a new larger Grizzly ByteBuffer is created
     * This handle is used to give Grizzly back its original default ByteBuffer .
     */
    private ByteBuffer restoreHandle;

    /**
     * The length of the data depicted by Protocol Header
     */
    private int dataLength = 0;

    /**
     * Just dataLength + Header Length
     */
    private int messageLength = 0;
    /*
      Keeps track of the bytes read by the Grizzly ReadFilter
    */
    private int trackBytesRead = 0;


    /**
     * Keeps track of the start position of the current message in the Grizzly ByteBuffer.
     * Needed because an Grizzly ByteBuffer can contain several Messages at once
     * The messageStart always reflects the start Boundary of the current Message
     * And is either zero or set when a message was fully parsed and following one is still
     * in the buffer
     */
    int messageStart = 0;

    /**
     * Holds the state of this Statemachine
     */
    private State state = null;

    /**
     * The Message given to other Filters up the chain
     */

    private MessageParser messageParser;

    public abstract MessageParser newInstance();

    public static enum State {
        START,
        EXPECTING_MORE_HEADER_BYTES,
        EXPECTING_MORE_DATA_BYTES,
        HAS_MORE_BYTES_TO_PARSE,
        MEASSAGE_PARSED,
        MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE,
    }


    public boolean isExpectingMoreData() {
        boolean result = state == State.EXPECTING_MORE_DATA_BYTES ||
                state == State.EXPECTING_MORE_HEADER_BYTES;

        return result;

    }


    public boolean hasMoreBytesToParse() {
        return state == State.HAS_MORE_BYTES_TO_PARSE;
    }

    public MessageParser getNextMessage() {

        switch (state) {
            case MEASSAGE_PARSED:
                resetState();
                break;
            case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
                state = State.HAS_MORE_BYTES_TO_PARSE;

        }
        MessageParser tmp = messageParser;
        messageParser.unattach();
        messageParser = null;
        return tmp;
    }


    public boolean hasNextMessage() {
        if (!startBufferExecuted) {
            startBuffer(((WorkerThread) Thread.currentThread()).getByteBuffer());
        }
        parseMessage();
        switch (state) {
            case MEASSAGE_PARSED:
            case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
                return true;
            default:
                return false;
        }
    }


    public void parseMessage() {
        switch (state) {
            case MEASSAGE_PARSED:
            case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
                return;
        }


        switch (state) {

            case HAS_MORE_BYTES_TO_PARSE:

            case START:
                messageParser = newInstance();

            case EXPECTING_MORE_HEADER_BYTES:
                if ((trackBytesRead - messageStart) < messageParser.getHeaderLength()) {
                    state = State.EXPECTING_MORE_HEADER_BYTES;
                    grizzlyBuffer.position(messageStart);
                     // doesn't happen often so we can compact
                    grizzlyBuffer.compact();
                    trackBytesRead=trackBytesRead-messageStart;
                    messageStart=0;
                    grizzlyBuffer.position(trackBytesRead);
                    return;
                }
                grizzlyBuffer.position(messageStart);
                try {
                    // expects a position messageStart
                    boolean isKMProtocol = messageParser.parseHeader(grizzlyBuffer);
                    if (!isKMProtocol) {
                        messageParser.setException(new Exception("Bad Startmark"));
                        state = State.MEASSAGE_PARSED;
                        return;

                    }
                } catch (Exception e) {
                    // System.out.println(e.getMessage());
                    messageParser.setException(e);
                    state = State.MEASSAGE_PARSED;
                    return;
                }

                dataLength = messageParser.getDataSize();
                messageLength = dataLength + MyIncomingMessage.HEADER_LENGTH;

                // position should by at data start
                if ((grizzlyBuffer.capacity() - grizzlyBuffer.position()) < dataLength) {
                    // Not enough room to store message so we need a bigger buffer and also some extra room for
                    // any new message chunks

                    // maybe I should do a compact to use a smaller buffer here but I am not sure about performance

                    int newCapacity = messageLength + grizzlyBuffer.position() + DEFAULT_BUFFER_SIZE;
                    grizzlyBuffer.position(0);
                    ByteBuffer newBuffer = ByteBufferFactory.allocateView(newCapacity, grizzlyBuffer.isDirect());
                    newBuffer.put(grizzlyBuffer);
                    WorkerThread workerThread = (WorkerThread) Thread.currentThread();
                    workerThread.setByteBuffer(grizzlyBuffer = newBuffer);

                }

                // Now create a sliced copy of GrizzlyBuffer so that
                // we have start and end pointers on our wanted message
                grizzlyBuffer.position(MyIncomingMessage.HEADER_LENGTH + messageStart);


                grizzlyBuffer.limit(messageLength + messageStart);

                ByteBuffer slicedHandle = grizzlyBuffer.slice();
                grizzlyBuffer.limit(grizzlyBuffer.capacity());
                // Now Buffer has enough space for message

                messageParser.setByteBuffer(slicedHandle);


            case EXPECTING_MORE_DATA_BYTES:
                int remaining = trackBytesRead - messageLength - messageStart;
                if (remaining == 0) {
                    // Ok we found a message
                    state = State.MEASSAGE_PARSED;

                } else if (remaining > 0) {

                    state = State.MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE;
                    messageStart = messageStart + messageLength;


                } else {
                    // we have to keep on ready bytes from the net
                    state = State.EXPECTING_MORE_DATA_BYTES;
                    grizzlyBuffer.position(trackBytesRead);



                }
        }

    }

    public boolean releaseBuffer() {
        boolean saveCurrentParserState = false;
        if (isExpectingMoreData()) {
            saveCurrentParserState = true;
        } else {
            if (restoreHandle == null) {

                Exception e = new Exception();
                e.printStackTrace();
                return false;
            }
            WorkerThread workerThread = (WorkerThread) Thread.currentThread();
            restoreHandle.clear();

            workerThread.setByteBuffer(restoreHandle);
            grizzlyBuffer = null;
            restoreHandle = null;
        }
        return saveCurrentParserState;

    }


    public void startBuffer(ByteBuffer bb) {
        restoreHandle = bb;
        // reflects the actual physically read byte by ReadFilter
        trackBytesRead = bb.position();


        grizzlyBuffer = bb;
        if (!isExpectingMoreData()) {
            grizzlyBuffer.position(0);
            resetState();
        }
        startBufferExecuted = true;
    }


    private void resetState() {
        state = State.START;
        dataLength = 0;
        messageLength = 0;
        messageStart = 0;
    }

     /**
     * Used with ProtocolParser to parse Messages and
     * fill an byteBuffer with the message data.
     */
    public abstract class MessageParser {

        private int dataSize;
        private ByteBuffer byteBuffer;

        public abstract boolean parseHeader(ByteBuffer bb) throws Exception;

        private byte[] bytes;



        final public ByteBuffer getByteBuffer() {
            return byteBuffer;
        }

        final public byte[] toByteArray() {
            if(hasException()) return new byte[0];
            return bytes;
        }


        final public void setByteBuffer(ByteBuffer byteBuffer) {
           // System.out.println("in s"+this.hashCode()+" "+(byteBuffer!=null));
            this.byteBuffer = byteBuffer;
        }

        public abstract int getHeaderLength();

        final public void setDataSize(int dataSize) {
            this.dataSize = dataSize;
        }

        final public int getDataSize() {
            return dataSize;
        }

        public byte[] getBytes() {
            return bytes;
        }

        public void setBytes(byte[] bytes) {
            this.bytes = bytes;
        }



        private Exception exception;

        final public Exception getException() {
            return exception;
        }

        final public void setException(Exception exception) {
            this.exception = exception;

        }

        final public boolean hasException() {
            return exception != null;
        }

        final public void unattach() {

            //System.out.println("in u"+this.hashCode()+" "+(byteBuffer!=null));
            if(hasException()) return;

            bytes = new byte[dataSize];
            byteBuffer.get(bytes);
            byteBuffer.clear();
            byteBuffer=null;

        }



    }
    public class MyIncomingMessage extends MessageParser {
    final static int HEADER_LENGTH=12;
    int START_MARK = 0x77335434;
    private boolean statusOk;



    public boolean parseHeader(ByteBuffer bb) throws Exception {


        int startmark = bb.getInt();
        int status = bb.getInt();

        int size = bb.getInt();
        if (startmark !=START_MARK) {
            return false;

        }
        statusOk =status== 1;

        setDataSize(size);

        if (getDataSize() <= 0) throw new Exception("Bad Message Length");
        return true;

    }



    public int getHeaderLength() {
        return HEADER_LENGTH;
    }


    public boolean isStatusOk() {
        return statusOk;
    }
}

}
-- 
Ist Ihr Browser Vista-kompatibel? Jetzt die neuesten 
Browser-Versionen downloaden: http://www.gmx.net/de/go/browser