Hi Jon,
it would help if you can commit the test, which doesn't work :)
Otherwise code looks fine, except here in RpbFilter
@Override public NextAction handleRead(FilterChainContext ctx) throws
IOException {
Buffer buffer = ctx.getMessage();
int size = buffer.getInt();
int code = buffer.get();
if (buffer.remaining() < (size - 1)) {
return ctx.getStopAction(buffer);
}
You'd probably need to restore original buffer position, before
returning ctx.getStopAction(buffer), otherwise you save a Buffer with a
shifted position, so next time when you read size and code it will be
corrupted.
WBR,
Alexey.
On 06/23/2011 09:55 PM, Jon Brisbin wrote:
> I'm trying to talk protocol buffers in a custom client I'm writing and
> I'm running into some kind of deadlock issue, I think. I could also be
> exhausting the thread pool unawares (I really want to queue this stuff
> somewhere...it's just not obvious to me how to do that). I'm able to
> write and read from the server if I block until the response is fully
> processed (request written and response processed). But if I try and
> write multiple requests on the same connection, something gets
> deadlocked somewhere and I have no idea where.
>
> I'm setting up my transport like this (workerPool is a fixed thread
> pool at availableProcessors() * 4):
>
> transport.setKeepAlive(true);
> transport.setTcpNoDelay(true);
> transport.setWorkerThreadPool(workerPool);
> transport.setIOStrategy(WorkerThreadIOStrategy.getInstance());
>
> My filters are:
>
> clientChainBuilder.add(new TransportFilter());
> clientChainBuilder.add(new RpbFilter(heap));
> clientChainBuilder.add(new PendingRequestFilter());
>
> RpbFilter is a protocol buffers encoder/decoder. The
> PendingRequestFilter is part of my client and is supposed to call the
> "saved" Future that was created for this request to pass back the
> value. Here's an excerpt:
>
> @Override public NextAction handleWrite(FilterChainContext
> ctx) throws IOException {
> RpbRequest pending = ctx.getMessage();
> pendingRequests.add(pending);
> ctx.setMessage(pending.getMessage());
> return ctx.getInvokeAction();
> }
>
> When a response comes back (which is, I think, where I'm getting
> messed up), I handle it like this (again, excerpted):
>
> @Override public NextAction handleRead(final
> FilterChainContext ctx) throws IOException {
> final RpbMessage<?> msg = ctx.getMessage();
> final RpbRequest pending = pendingRequests.peek();
> if (null == pendingRequests.peek()) {
> log.warn("Got a response when no pending requests
> available!");
> return ctx.getStopAction();
> } else {
> pendingRequests.remove(pending);
> }
> ...process the protobuf object and create an application-specific one
> to pass to the client requestor...
> if (null != response && null != pending.getCallback())
> pending.getCallback().completed(response);
>
> The project is here (tests pass now because I'm blocking):
>
> https://github.com/jbrisbin/riak-async-java-client
>
> Any help you could give me here would be greatly appreciated! :)
>
> Thanks!
>
> Jon Brisbin
> http//jbrisbin.com
>
>