I put a rewind in there. Was that not enough?
Sent from my iPhone
On Jun 25, 2011, at 2:54 AM, Oleksiy Stashok <oleksiy.stashok_at_oracle.com> wrote:
> Hi Jon,
>
> Have you tried to fix the RpbFilter decoding (see my prev. email)?
>
> WBR,
> Alexey.
>
> On 06/25/2011 01:27 AM, Jon Brisbin wrote:
>>
>> I updated the project to make the tests hang. To successfully execute them, you'd need a Riak installation locally on the default port.
>>
>> It will hang on the future.get() call on line 122 because the responses are getting out-of-sync with the requests.
>>
>> https://github.com/jbrisbin/riak-async-java-client/blob/master/src/test/groovy/com/jbrisbin/riak/async/RiakAsyncClientSpec.groovy
>>
>> Is there anything I should be doing on my end to do some request/response accounting? I can't tell whether the problem is in the client or in the server-side implementation of protobuf...
>>
>> Thanks for the help! :)
>>
>>
>> Jon Brisbin
>> http//jbrisbin.com
>>
>>
>> From: "Oleksiy Stashok" <oleksiy.stashok_at_oracle.com>
>> To: users_at_grizzly.java.net
>> Sent: Friday, June 24, 2011 4:38:54 AM
>> Subject: Re: Do I need connection pooling? Or am I just doing it wrong?
>>
>> Hi Jon,
>>
>> it would help if you can commit the test, which doesn't work :)
>>
>> Otherwise code looks fine, except here in RpbFilter
>>
>> @Override public NextAction handleRead(FilterChainContext ctx) throws IOException {
>> Buffer buffer = ctx.getMessage();
>> int size = buffer.getInt();
>> int code = buffer.get();
>> if (buffer.remaining() < (size - 1)) {
>> return ctx.getStopAction(buffer);
>> }
>>
>> You'd probably need to restore original buffer position, before returning ctx.getStopAction(buffer), otherwise you save a Buffer with a shifted position, so next time when you read size and code it will be corrupted.
>>
>> WBR,
>> Alexey.
>>
>> On 06/23/2011 09:55 PM, Jon Brisbin wrote:
>> I'm trying to talk protocol buffers in a custom client I'm writing and I'm running into some kind of deadlock issue, I think. I could also be exhausting the thread pool unawares (I really want to queue this stuff somewhere...it's just not obvious to me how to do that). I'm able to write and read from the server if I block until the response is fully processed (request written and response processed). But if I try and write multiple requests on the same connection, something gets deadlocked somewhere and I have no idea where.
>>
>> I'm setting up my transport like this (workerPool is a fixed thread pool at availableProcessors() * 4):
>>
>> transport.setKeepAlive(true);
>> transport.setTcpNoDelay(true);
>> transport.setWorkerThreadPool(workerPool);
>> transport.setIOStrategy(WorkerThreadIOStrategy.getInstance());
>>
>> My filters are:
>>
>> clientChainBuilder.add(new TransportFilter());
>> clientChainBuilder.add(new RpbFilter(heap));
>> clientChainBuilder.add(new PendingRequestFilter());
>>
>> RpbFilter is a protocol buffers encoder/decoder. The PendingRequestFilter is part of my client and is supposed to call the "saved" Future that was created for this request to pass back the value. Here's an excerpt:
>>
>> @Override public NextAction handleWrite(FilterChainContext ctx) throws IOException {
>> RpbRequest pending = ctx.getMessage();
>> pendingRequests.add(pending);
>> ctx.setMessage(pending.getMessage());
>> return ctx.getInvokeAction();
>> }
>>
>> When a response comes back (which is, I think, where I'm getting messed up), I handle it like this (again, excerpted):
>>
>> @Override public NextAction handleRead(final FilterChainContext ctx) throws IOException {
>> final RpbMessage<?> msg = ctx.getMessage();
>> final RpbRequest pending = pendingRequests.peek();
>> if (null == pendingRequests.peek()) {
>> log.warn("Got a response when no pending requests available!");
>> return ctx.getStopAction();
>> } else {
>> pendingRequests.remove(pending);
>> }
>> ...process the protobuf object and create an application-specific one to pass to the client requestor...
>> if (null != response && null != pending.getCallback())
>> pending.getCallback().completed(response);
>>
>> The project is here (tests pass now because I'm blocking):
>>
>> https://github.com/jbrisbin/riak-async-java-client
>>
>> Any help you could give me here would be greatly appreciated! :)
>>
>> Thanks!
>>
>> Jon Brisbin
>> http//jbrisbin.com
>>
>>
>>
>>
>