Hi,
just found some bugs in my parser.
here is a better version:
package util;
import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.util.ByteBufferFactory;
import com.sun.grizzly.util.WorkerThread;
import java.nio.ByteBuffer;
/**
* ProtocolParser Filter which parses a Custom Protocol
* into a MessageParser. These IncomingMessages can then be used
* by other Protocolchain Filters
*
* So basicly implement abstract method
* "public boolean parseHeader(ByteBuffer bb)" of
* IncommingMessage and retrieve these message up by another above Filter in chain.
*
* The Messages can be arbitrary long.
* The Protocol has the following format
* (n params + Data Size + Data):
* n bytes : n number of header params
* ----------------------------------------------
* 4 bytes Data length - The length of Data
* n bytes Data - Message
*
* The Protocol asumes that on average Messages are not too large (Gigs) otherwise
* this implementation might not be effective.
* Tries to avoid bytearray copying and therfore maps the IncomingMessageeee onto
* the orginal Grizzly Bytebuffer. If Messages do not fit into an default sized Grizzly
* ByteBuffer a larger one with enough capacity is temporarly given to the framework.
*
* This class is still under construction and for example Error Handling,
* Timeouts, Recovery has to be greatly improved!!!
* John Vieten
*/
public abstract class MyProtocolParser implements ProtocolParser<MyProtocolParser.MessageParser> {
private boolean startBufferExecuted = false;
private boolean modeWithoutReadFilter;
protected MyProtocolParser(boolean modeWithoutReadFilter) {
this.state = modeWithoutReadFilter ? State.HAS_MORE_BYTES_TO_PARSE : State.START;
this.modeWithoutReadFilter = modeWithoutReadFilter;
}
public static final int DEFAULT_BUFFER_SIZE = 8192;
/**
* Handle to byteBuffer which gets filled in by grizzly
*/
private ByteBuffer grizzlyBuffer;
/**
* If Message are larger than DEFAULT_BUFFER_SIZE a new larger Grizzly ByteBuffer is created
* This handle is used to give Grizzly back its original default ByteBuffer .
*/
private ByteBuffer restoreHandle;
/**
* The length of the data depicted by Protocol Header
*/
private int dataLength = 0;
/**
* Just dataLength + Header Length
*/
private int messageLength = 0;
/*
Keeps track of the bytes read by the Grizzly ReadFilter
*/
private int trackBytesRead = 0;
/**
* Keeps track of the start position of the current message in the Grizzly ByteBuffer.
* Needed because an Grizzly ByteBuffer can contain several Messages at once
* The messageStart always reflects the start Boundary of the current Message
* And is either zero or set when a message was fully parsed and following one is still
* in the buffer
*/
int messageStart = 0;
/**
* Holds the state of this Statemachine
*/
private State state = null;
/**
* The Message given to other Filters up the chain
*/
private MessageParser messageParser;
public abstract MessageParser newInstance();
public static enum State {
START,
EXPECTING_MORE_HEADER_BYTES,
EXPECTING_MORE_DATA_BYTES,
HAS_MORE_BYTES_TO_PARSE,
MEASSAGE_PARSED,
MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE,
}
public boolean isExpectingMoreData() {
boolean result = state == State.EXPECTING_MORE_DATA_BYTES ||
state == State.EXPECTING_MORE_HEADER_BYTES;
return result;
}
public boolean hasMoreBytesToParse() {
return state == State.HAS_MORE_BYTES_TO_PARSE;
}
public MessageParser getNextMessage() {
switch (state) {
case MEASSAGE_PARSED:
resetState();
break;
case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
state = State.HAS_MORE_BYTES_TO_PARSE;
}
MessageParser tmp = messageParser;
messageParser.unattach();
messageParser = null;
return tmp;
}
public boolean hasNextMessage() {
if (!startBufferExecuted) {
startBuffer(((WorkerThread) Thread.currentThread()).getByteBuffer());
}
parseMessage();
switch (state) {
case MEASSAGE_PARSED:
case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
return true;
default:
return false;
}
}
public void parseMessage() {
switch (state) {
case MEASSAGE_PARSED:
case MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE:
return;
}
switch (state) {
case HAS_MORE_BYTES_TO_PARSE:
case START:
messageParser = newInstance();
case EXPECTING_MORE_HEADER_BYTES:
if ((trackBytesRead - messageStart) < messageParser.getHeaderLength()) {
state = State.EXPECTING_MORE_HEADER_BYTES;
grizzlyBuffer.position(messageStart);
// doesn't happen often so we can compact
grizzlyBuffer.compact();
trackBytesRead=trackBytesRead-messageStart;
messageStart=0;
grizzlyBuffer.position(trackBytesRead);
return;
}
grizzlyBuffer.position(messageStart);
try {
// expects a position messageStart
boolean isKMProtocol = messageParser.parseHeader(grizzlyBuffer);
if (!isKMProtocol) {
messageParser.setException(new Exception("Bad Startmark"));
state = State.MEASSAGE_PARSED;
return;
}
} catch (Exception e) {
// System.out.println(e.getMessage());
messageParser.setException(e);
state = State.MEASSAGE_PARSED;
return;
}
dataLength = messageParser.getDataSize();
messageLength = dataLength + MyIncomingMessage.HEADER_LENGTH;
// position should by at data start
if ((grizzlyBuffer.capacity() - grizzlyBuffer.position()) < dataLength) {
// Not enough room to store message so we need a bigger buffer and also some extra room for
// any new message chunks
// maybe I should do a compact to use a smaller buffer here but I am not sure about performance
int newCapacity = messageLength + grizzlyBuffer.position() + DEFAULT_BUFFER_SIZE;
grizzlyBuffer.position(0);
ByteBuffer newBuffer = ByteBufferFactory.allocateView(newCapacity, grizzlyBuffer.isDirect());
newBuffer.put(grizzlyBuffer);
WorkerThread workerThread = (WorkerThread) Thread.currentThread();
workerThread.setByteBuffer(grizzlyBuffer = newBuffer);
}
// Now create a sliced copy of GrizzlyBuffer so that
// we have start and end pointers on our wanted message
grizzlyBuffer.position(MyIncomingMessage.HEADER_LENGTH + messageStart);
grizzlyBuffer.limit(messageLength + messageStart);
ByteBuffer slicedHandle = grizzlyBuffer.slice();
grizzlyBuffer.limit(grizzlyBuffer.capacity());
// Now Buffer has enough space for message
messageParser.setByteBuffer(slicedHandle);
case EXPECTING_MORE_DATA_BYTES:
int remaining = trackBytesRead - messageLength - messageStart;
if (remaining == 0) {
// Ok we found a message
state = State.MEASSAGE_PARSED;
} else if (remaining > 0) {
state = State.MEASSAGE_PARSED_AND_HAS_MORE_BYTES_TO_PARSE;
messageStart = messageStart + messageLength;
} else {
// we have to keep on ready bytes from the net
state = State.EXPECTING_MORE_DATA_BYTES;
grizzlyBuffer.position(trackBytesRead);
}
}
}
public boolean releaseBuffer() {
boolean saveCurrentParserState = false;
if (isExpectingMoreData()) {
saveCurrentParserState = true;
} else {
if (restoreHandle == null) {
Exception e = new Exception();
e.printStackTrace();
return false;
}
WorkerThread workerThread = (WorkerThread) Thread.currentThread();
restoreHandle.clear();
workerThread.setByteBuffer(restoreHandle);
grizzlyBuffer = null;
restoreHandle = null;
}
return saveCurrentParserState;
}
public void startBuffer(ByteBuffer bb) {
restoreHandle = bb;
// reflects the actual physically read byte by ReadFilter
trackBytesRead = bb.position();
grizzlyBuffer = bb;
if (!isExpectingMoreData()) {
grizzlyBuffer.position(0);
resetState();
}
startBufferExecuted = true;
}
private void resetState() {
state = State.START;
dataLength = 0;
messageLength = 0;
messageStart = 0;
}
/**
* Used with ProtocolParser to parse Messages and
* fill an byteBuffer with the message data.
*/
public abstract class MessageParser {
private int dataSize;
private ByteBuffer byteBuffer;
public abstract boolean parseHeader(ByteBuffer bb) throws Exception;
private byte[] bytes;
final public ByteBuffer getByteBuffer() {
return byteBuffer;
}
final public byte[] toByteArray() {
if(hasException()) return new byte[0];
return bytes;
}
final public void setByteBuffer(ByteBuffer byteBuffer) {
// System.out.println("in s"+this.hashCode()+" "+(byteBuffer!=null));
this.byteBuffer = byteBuffer;
}
public abstract int getHeaderLength();
final public void setDataSize(int dataSize) {
this.dataSize = dataSize;
}
final public int getDataSize() {
return dataSize;
}
public byte[] getBytes() {
return bytes;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
private Exception exception;
final public Exception getException() {
return exception;
}
final public void setException(Exception exception) {
this.exception = exception;
}
final public boolean hasException() {
return exception != null;
}
final public void unattach() {
//System.out.println("in u"+this.hashCode()+" "+(byteBuffer!=null));
if(hasException()) return;
bytes = new byte[dataSize];
byteBuffer.get(bytes);
byteBuffer.clear();
byteBuffer=null;
}
}
public class MyIncomingMessage extends MessageParser {
final static int HEADER_LENGTH=12;
int START_MARK = 0x77335434;
private boolean statusOk;
public boolean parseHeader(ByteBuffer bb) throws Exception {
int startmark = bb.getInt();
int status = bb.getInt();
int size = bb.getInt();
if (startmark !=START_MARK) {
return false;
}
statusOk =status== 1;
setDataSize(size);
if (getDataSize() <= 0) throw new Exception("Bad Message Length");
return true;
}
public int getHeaderLength() {
return HEADER_LENGTH;
}
public boolean isStatusOk() {
return statusOk;
}
}
}
--
Ist Ihr Browser Vista-kompatibel? Jetzt die neuesten
Browser-Versionen downloaden: http://www.gmx.net/de/go/browser