Salut,
Parker Lord wrote:
> The full set of files for my test case is included in the post.
>
> The last file is the client to invloke the server. The first set of files
> make up the server.
> Let me know if you need me to sip them up and attach them.
My bad I've missed the client. But if you can ZIp that will be perfect.
Thanks!
-- Jeanfrancois
>
>
>
> Jeanfrancois Arcand-2 wrote:
>> Salut,
>>
>> Parker Lord wrote:
>>> I have been struggling with an issue for a few days now and am at a loss
>>> as to what is wrong. I am trying to build a ProtocolParser to parse XML
>>> messages which could be larger than 8K. I have read all the relevant
>>> postings on the forum here and have tried two different approaches, both
>>> of which exhibit the same behavior. I upgrade to 1.9.15 today as well,
>>> and still no resolution. My issue is that everything works when I run in
>>> the debugger and have a breakpoint set in the code. If I disable
>>> breakpoints, I get this error:
>>>
>>> May 6, 2009 2:39:29 PM atlantes.connector.test.SampleProtocolParser
>>> hasNextMessage
>>> INFO: GrizzlyWorker-3 before expand: buffer java.nio.HeapByteBuffer[pos=0
>>> lim=8192 cap=8192] isExpectingMoreData false hasMoreBytesToParse false
>>> May 6, 2009 2:39:29 PM atlantes.connector.test.SampleProtocolParser
>>> hasNextMessage
>>> INFO: GrizzlyWorker-3 after expand: buffer
>>> java.nio.HeapByteBuffer[pos=8192 lim=16384 cap=16384] isExpectingMoreData
>>> true hasMoreBytesToParse true
>>> May 6, 2009 2:39:29 PM com.sun.grizzly.DefaultProtocolChain
>>> executeProtocolFilter
>>> SEVERE: ProtocolChain exception
>>> java.lang.IllegalStateException: ByteBuffer is full:
>>> java.nio.HeapByteBuffer[pos=0 lim=0 cap=16384]
>>> at com.sun.grizzly.filter.ReadFilter.execute(ReadFilter.java:143)
>>> at com.sun.grizzly.filter.ReadFilter.execute(ReadFilter.java:103)
>>> at
>>> com.sun.grizzly.filter.ParserProtocolFilter.superExecute(ParserProtocolFilter.java:207)
>>> at
>>> com.sun.grizzly.filter.ParserProtocolFilter.execute(ParserProtocolFilter.java:131)
>>> at
>>> com.sun.grizzly.DefaultProtocolChain.executeProtocolFilter(DefaultProtocolChain.java:136)
>>> at
>>> com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:103)
>>> at
>>> com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:89)
>>> at
>>> com.sun.grizzly.ProtocolChainContextTask.doCall(ProtocolChainContextTask.java:53)
>>> at
>>> com.sun.grizzly.SelectionKeyContextTask.call(SelectionKeyContextTask.java:57)
>>> at com.sun.grizzly.ContextTask.run(ContextTask.java:69)
>>> at
>>> com.sun.grizzly.util.FixedThreadPool$BasicWorker.dowork(FixedThreadPool.java:335)
>>> at
>>> com.sun.grizzly.util.FixedThreadPool$BasicWorker.run(FixedThreadPool.java:320)
>>> at java.lang.Thread.run(Thread.java:595)
>>>
>>>
>>> When I have a breakpoint set (or in a few cases when it works), the
>>> output looks like this:
>>>
>>> May 6, 2009 2:38:41 PM atlantes.connector.test.SampleProtocolParser
>>> hasNextMessage
>>> INFO: GrizzlyWorker-2 before expand: buffer java.nio.HeapByteBuffer[pos=0
>>> lim=8192 cap=8192] isExpectingMoreData false hasMoreBytesToParse false
>>> May 6, 2009 2:38:41 PM atlantes.connector.test.SampleProtocolParser
>>> hasNextMessage
>>> INFO: GrizzlyWorker-2 after expand: buffer
>>> java.nio.HeapByteBuffer[pos=8192 lim=16384 cap=16384] isExpectingMoreData
>>> true hasMoreBytesToParse true
>>> May 6, 2009 2:38:42 PM atlantes.connector.test.SampleProtocolParser
>>> extractMessage
>>> INFO: GrizzlyWorker-2 before extract: buffer
>>> java.nio.HeapByteBuffer[pos=0 lim=10265 cap=16384] isExpectingMoreData
>>> true hasMoreBytesToParse true
>>> May 6, 2009 2:38:42 PM atlantes.connector.test.SampleProtocolParser
>>> extractMessage
>>> INFO: GrizzlyWorker-2 after extract: buffer
>>> java.nio.HeapByteBuffer[pos=10265 lim=10265 cap=16384]
>>> isExpectingMoreData true hasMoreBytesToParse true
>>>
>>> It seems that when the call to hasNextMessage() returns with a larger
>>> buffer having been set in the currentThread, when it next retrieves it
>>> from the thread and checks it, it seems to now have become full!!!!!
>>> Some sort of race condition or thread switching issue is going on, but I
>>> cant test since by debugging the code, it works.
>> Can you share a test case?
>>
>>
>> Another problem I
>>> noticed and I am not sure if it is related. In the examples I have seen
>>> here, when the buffer is reallocated, hasMoreBytesToParse is set to
>>> false. If I set this to false, hasNextMessage does not get called back.
>>> You can see this clearly in ParserProtocolFilter postExecute() method:
>>>
>>> public boolean postExecute(Context context) throws IOException {
>>> ProtocolParser parser = (ProtocolParser)
>>> context.getAttribute(ProtocolParser.PARSER);
>>>
>>> if (parser == null) {
>>> return true;
>>> }
>>>
>>> if (parser.hasMoreBytesToParse()) {
>>> // Need to say that we read successfully since bytes are left
>>>
>>> context.setAttribute(ProtocolChain.PROTOCOL_CHAIN_POST_INSTRUCTION,
>>> ProtocolChainInstruction.REINVOKE);
>>> return true;
>>> }
>>>
>>> and then in DefaultProtocolChain postExecuteProtocolFilter() :
>>>
>>> ProtocolChainInstruction postInstruction =
>>> (ProtocolChainInstruction) ctx.removeAttribute(
>>> PROTOCOL_CHAIN_POST_INSTRUCTION);
>>>
>>> if (postInstruction != null &&
>>> postInstruction == ProtocolChainInstruction.REINVOKE) {
>>> reinvokeChain = true;
>>> } else if (continousExecution
>>> && currentPosition == protocolFilters.size() -1
>>> &&
>>> (Boolean)ctx.removeAttribute(ProtocolFilter.SUCCESSFUL_READ)
>>> == Boolean.TRUE) {
>>> reinvokeChain = true;
>>> }
>>>
>>> return reinvokeChain;
>>>
>>>
>>> The only chance to get reinvoked is to have hasMoreBytesToParse be set
>>> to true, not false. Any help would be appreciated. Here is all the
>>> source code:
>> Great I will take a look. How do you stress the server?
>>
>> Thanks!
>>
>> -- Jeanfrancois
>>
>>
>>> /* Main Class */
>>> package atlantes.connector.test;
>>>
>>> public class SampleMain
>>> {
>>> public enum ConnectorProtocol { TCP, UDP };
>>> public static int defaultPort = 50100;
>>> public static ConnectorProtocol defaultProtocol =
>>> ConnectorProtocol.TCP;
>>>
>>> public static void main (String[] args) throws Exception
>>> {
>>> ConnectorProtocol protocol;
>>> int port;
>>>
>>> protocol = defaultProtocol;
>>> port = defaultPort;
>>>
>>> new SampleListener ().service(protocol, port);
>>> }
>>> }
>>>
>>> /* Sample Listener */
>>> package atlantes.connector.test;
>>>
>>> import java.io.IOException;
>>> import java.util.concurrent.ExecutorService;
>>>
>>> import atlantes.connector.test.SampleMain.ConnectorProtocol;
>>>
>>> import com.sun.grizzly.*;
>>> import com.sun.grizzly.util.PipelineThreadPool;
>>>
>>> public class SampleListener
>>> {
>>> public void service(ConnectorProtocol protocol, int port)
>>> {
>>> final Controller controller = new Controller();
>>>
>>> final SampleProtocolFilter parser = new SampleProtocolFilter();
>>> final SampleMessageFilter filter = new SampleMessageFilter();
>>>
>>> final ProtocolChain protocolChain = new DefaultProtocolChain();
>>> protocolChain.addFilter(parser);
>>> protocolChain.addFilter(filter);
>>> ((DefaultProtocolChain)protocolChain).setContinuousExecution(true);
>>>
>>> ProtocolChainInstanceHandler pciHandler = new
>>> DefaultProtocolChainInstanceHandler()
>>> {
>>> public ProtocolChain poll()
>>> {
>>> return protocolChain;
>>> }
>>>
>>> public boolean offer(ProtocolChain protocolChain)
>>> {
>>> return false;
>>> }
>>> };
>>>
>>> controller.setProtocolChainInstanceHandler(pciHandler);
>>>
>>> if (protocol.equals(ConnectorProtocol.TCP))
>>> {
>>> TCPSelectorHandler tcpHandler = new TCPSelectorHandler();
>>> tcpHandler.setPort(port);
>>> controller.addSelectorHandler(tcpHandler);
>>> }
>>> else if (protocol.equals(ConnectorProtocol.UDP))
>>> {
>>> UDPSelectorHandler udpHandler = new UDPSelectorHandler();
>>> udpHandler.setPort(port);
>>> controller.addSelectorHandler(udpHandler);
>>> }
>>>
>>> ExecutorService threadPool = new PipelineThreadPool();
>>> controller.setThreadPool(threadPool);
>>>
>>> try
>>> {
>>> controller.start();
>>> }
>>> catch (IOException e)
>>> {
>>> e.printStackTrace();
>>> }
>>> }
>>> }
>>>
>>> /* SampleProtocolFilter */
>>>
>>> package atlantes.connector.test;
>>>
>>>
>>> import com.sun.grizzly.ProtocolParser;
>>> import com.sun.grizzly.filter.ParserProtocolFilter;
>>>
>>> public class SampleProtocolFilter extends ParserProtocolFilter {
>>>
>>> @SuppressWarnings("unchecked")
>>> @Override
>>> public ProtocolParser newProtocolParser() {
>>> //return new SolidicaProtocolParser();
>>> return new SampleProtocolParser();
>>>
>>> }
>>> }
>>>
>>> /* SampleProtocolParser */
>>> package atlantes.connector.test;
>>>
>>> import com.sun.grizzly.ProtocolParser;
>>> import com.sun.grizzly.util.ByteBufferFactory;
>>> import com.sun.grizzly.util.WorkerThread;
>>> import java.nio.ByteBuffer;
>>> import java.nio.charset.CharacterCodingException;
>>> import java.nio.charset.Charset;
>>> import java.nio.charset.CharsetDecoder;
>>> import java.util.logging.Logger;
>>>
>>> public class SampleProtocolParser implements ProtocolParser
>>> {
>>> protected CharsetDecoder f_asciiDecoder =
>>> Charset.forName("ISO-8859-1").newDecoder();
>>>
>>> Logger syslog = Logger.getLogger(SampleProtocolParser.class.getName());
>>>
>>> private ByteBuffer byteBuffer;
>>> private boolean isExpectingMoreData, hasMoreBytesToParse, doCompact =
>>> true;
>>> private String completeMessage;
>>> private boolean logIt = true;
>>>
>>> public SampleProtocolParser()
>>> {};
>>>
>>> public boolean isExpectingMoreData()
>>> {
>>> return isExpectingMoreData;
>>> }
>>> public boolean hasMoreBytesToParse()
>>> {
>>> return hasMoreBytesToParse;
>>> }
>>>
>>> public String getNextMessage()
>>> {
>>> String ip = completeMessage;
>>> completeMessage = null;
>>> return ip;
>>> }
>>>
>>> public boolean hasNextMessage()
>>> {
>>> byteBuffer.mark();
>>>
>>> completeMessage = extractMessage(byteBuffer);
>>>
>>> if (completeMessage != null)
>>> {
>>> isExpectingMoreData = false;
>>> // releaseBuffer() is only called with hasMoreBytesToParse == false
>>> // which means compact() would never be called!
>>> hasMoreBytesToParse = byteBuffer.hasRemaining();
>>> if (hasMoreBytesToParse)
>>> {
>>> doCompact = true;
>>> }
>>> else
>>> {
>>> byteBuffer.clear();
>>> doCompact = false;
>>> }
>>>
>>> return true;
>>> }
>>> else
>>> {
>>> if (logIt)
>>> syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
>>> before expand: buffer " + byteBuffer.toString() + " isExpectingMoreData "
>>> + isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>>>
>>> ByteBuffer newBuffer =
>>> ByteBufferFactory.allocateView(byteBuffer.capacity() * 2, false);
>>> byteBuffer.reset();
>>> newBuffer.put(byteBuffer);
>>> ((WorkerThread)Thread.currentThread()).setByteBuffer(byteBuffer =
>>> newBuffer);
>>>
>>> // If hasMoreBytesToParse is false, we do not get re-invoked!!!
>>> //hasMoreBytesToParse = false;
>>> hasMoreBytesToParse = true;
>>> isExpectingMoreData = true;
>>> doCompact = false;
>>>
>>> syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
>>> after expand: buffer " + byteBuffer.toString() + " isExpectingMoreData "
>>> + isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>>>
>>> }
>>>
>>> return !isExpectingMoreData;
>>> } // hasNextMessage
>>>
>>> public void startBuffer(ByteBuffer bb)
>>> {
>>> byteBuffer = bb;
>>> byteBuffer.flip();
>>> } // startBuffer
>>>
>>> public boolean releaseBuffer()
>>> {
>>> if (doCompact)
>>> {
>>> byteBuffer.compact();
>>> }
>>> byteBuffer = null;
>>>
>>> return false;
>>> } // releaseBuffer
>>>
>>> private String extractMessage(ByteBuffer buffer)
>>> {
>>> final String startOfMessage = "Once upon a time";
>>> final String endOfMessage = "The End.";
>>> // decode the buffer
>>> // Make a copy and convert it to a string to perform our check to see
>>> if there is a message present
>>> String msg = null;
>>> String returnMsg = null;
>>> try
>>> {
>>> ByteBuffer tmp = buffer.duplicate();
>>> msg = f_asciiDecoder.decode(tmp).toString();
>>> }
>>> catch (CharacterCodingException e)
>>> {
>>> e.printStackTrace();
>>> }
>>>
>>> // Check to see if we have a message terminator
>>> int startIndex = msg.indexOf(startOfMessage);
>>> int endIndex = msg.indexOf(endOfMessage);
>>> if (startIndex > -1 && endIndex > -1) // msg found
>>> {
>>> if (logIt)
>>> syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
>>> before extract: buffer " + buffer.toString() + " isExpectingMoreData " +
>>> isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>>>
>>> // Extract the bytes from the buffer
>>> endIndex += endOfMessage.length();
>>> byte[] bytes = new byte[endIndex - startIndex];
>>> buffer.get(bytes, startIndex, endIndex);
>>>
>>> returnMsg = new String(bytes);
>>>
>>> //buffer.compact();
>>>
>>> if (logIt)
>>> syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
>>> after extract: buffer " + buffer.toString() + " isExpectingMoreData " +
>>> isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>>> }
>>>
>>> return returnMsg;
>>> }
>>> }
>>>
>>> /* SampleMessageFilter */
>>> package atlantes.connector.test;
>>>
>>> import java.io.IOException;
>>>
>>> import com.sun.grizzly.Context;
>>> import com.sun.grizzly.ProtocolFilter;
>>> import com.sun.grizzly.ProtocolParser;
>>>
>>> public class SampleMessageFilter implements ProtocolFilter
>>> {
>>> public boolean execute(Context context)
>>> {
>>> String message = (String)
>>> context.removeAttribute(ProtocolParser.MESSAGE);
>>>
>>> System.out.println(message);
>>>
>>> return false;
>>> }
>>>
>>> public boolean postExecute(Context context) throws IOException
>>> {
>>> return true;
>>> }
>>> }
>>>
>>> /* ClientSampleMain */
>>> package atlantes.connector.test;
>>>
>>> import java.io.*;
>>> import java.net.*;
>>>
>>> public class ClientSampleMain
>>> {
>>>
>>> public static void main(String[] args) throws IOException
>>> {
>>>
>>> int listenPort = atlantes.connector.test.SampleMain.defaultPort;
>>>
>>> StringBuffer sb = new StringBuffer();
>>>
>>> // Create the message which is larger than 8K
>>>
>>> sb.append("Once upon a time,");
>>>
>>> for (int i = 0; i < 1024; ++i)
>>> {
>>> sb.append("0123456789");
>>> }
>>>
>>> sb.append("The End.");
>>>
>>> // Open the file and parse each line
>>> // Skip line starting with a # or empty lines
>>> try
>>> {
>>> Socket socket = new Socket("localhost", listenPort);
>>> if (socket == null)
>>> {
>>> System.out.println("Unable to open socket");
>>> return;
>>> }
>>>
>>> PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
>>> out.print(sb.toString());
>>>
>>> out.close();
>>> socket.close();
>>> socket = null;
>>> }
>>> catch (IllegalArgumentException ex)
>>> {
>>> ex.printStackTrace();
>>> }
>>>
>>> return;
>>> }
>>> }
>>>
>>>
>>>
>>> ------------------------------------------------------------------------
>>> View this message in context: ProtocolParser Issue with Reallocation of
>>> ByteBuffer
>>> <http://www.nabble.com/ProtocolParser-Issue-with-Reallocation-of-ByteBuffer-tp23416360p23416360.html>
>>> Sent from the Grizzly - Development mailing list archive
>>> <http://www.nabble.com/Grizzly---Development-f23248.html> at Nabble.com.
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>>
>>
>