users@grizzly.java.net

Re: Do I need connection pooling? Or am I just doing it wrong?

From: Jon Brisbin <jon_at_jbrisbin.com>
Date: Fri, 24 Jun 2011 18:27:07 -0500 (CDT)

I updated the project to make the tests hang. To successfully execute them, you'd need a Riak installation locally on the default port.

It will hang on the future.get() call on line 122 because the responses are getting out-of-sync with the requests.

https://github.com/jbrisbin/riak-async-java-client/blob/master/src/test/groovy/com/jbrisbin/riak/async/RiakAsyncClientSpec.groovy

Is there anything I should be doing on my end to do some request/response accounting? I can't tell whether the problem is in the client or in the server-side implementation of protobuf...

Thanks for the help! :)

Jon Brisbin
http//jbrisbin.com

----- Original Message -----

> From: "Oleksiy Stashok" <oleksiy.stashok_at_oracle.com>
> To: users_at_grizzly.java.net
> Sent: Friday, June 24, 2011 4:38:54 AM
> Subject: Re: Do I need connection pooling? Or am I just doing it
> wrong?

> Hi Jon,

> it would help if you can commit the test, which doesn't work :)

> Otherwise code looks fine, except here in RpbFilter

> @Override public NextAction handleRead ( FilterChainContext ctx )
> throws IOException {
> Buffer buffer = ctx . getMessage ();
> int size = buffer . getInt ();
> int code = buffer . get ();
> if ( buffer . remaining () < ( size - 1 )) {
> return ctx . getStopAction ( buffer );
> }
> You'd probably need to restore original buffer position, before
> returning ctx.getStopAction(buffer), otherwise you save a Buffer
> with a shifted position, so next time when you read size and code it
> will be corrupted.

> WBR,
> Alexey.

> On 06/23/2011 09:55 PM, Jon Brisbin wrote:
> > I'm trying to talk protocol buffers in a custom client I'm writing
> > and I'm running into some kind of deadlock issue, I think. I could
> > also be exhausting the thread pool unawares (I really want to queue
> > this stuff somewhere...it's just not obvious to me how to do that).
> > I'm able to write and read from the server if I block until the
> > response is fully processed (request written and response
> > processed). But if I try and write multiple requests on the same
> > connection, something gets deadlocked somewhere and I have no idea
> > where.
>

> > I'm setting up my transport like this (workerPool is a fixed thread
> > pool at availableProcessors() * 4):
>

> > transport.setKeepAlive(true);
>
> > transport.setTcpNoDelay(true);
>
> > transport.setWorkerThreadPool(workerPool);
>
> > transport.setIOStrategy(WorkerThreadIOStrategy.getInstance());
>

> > My filters are:
>

> > clientChainBuilder.add(new TransportFilter());
>
> > clientChainBuilder.add(new RpbFilter(heap));
>
> > clientChainBuilder.add(new PendingRequestFilter());
>

> > RpbFilter is a protocol buffers encoder/decoder. The
> > PendingRequestFilter is part of my client and is supposed to call
> > the "saved" Future that was created for this request to pass back
> > the value. Here's an excerpt:
>

> > @Override public NextAction handleWrite(FilterChainContext ctx)
> > throws IOException {
>
> > RpbRequest pending = ctx.getMessage();
>
> > pendingRequests.add(pending);
>
> > ctx.setMessage(pending.getMessage());
>
> > return ctx.getInvokeAction();
>
> > }
>

> > When a response comes back (which is, I think, where I'm getting
> > messed up), I handle it like this (again, excerpted):
>

> > @Override public NextAction handleRead(final FilterChainContext
> > ctx)
> > throws IOException {
>
> > final RpbMessage<?> msg = ctx.getMessage();
>
> > final RpbRequest pending = pendingRequests.peek();
>
> > if (null == pendingRequests.peek()) {
>
> > log.warn("Got a response when no pending requests available!");
>
> > return ctx.getStopAction();
>
> > } else {
>
> > pendingRequests.remove(pending);
>
> > }
>
> > ...process the protobuf object and create an application-specific
> > one
> > to pass to the client requestor...
>
> > if (null != response && null != pending.getCallback())
>
> > pending.getCallback().completed(response);
>

> > The project is here (tests pass now because I'm blocking):
>

> > https://github.com/jbrisbin/riak-async-java-client
>

> > Any help you could give me here would be greatly appreciated! :)
>

> > Thanks!
>

> > Jon Brisbin
>
> > http//jbrisbin.com
>