users@grizzly.java.net

Re: Do I need connection pooling? Or am I just doing it wrong?

From: Jon Brisbin <jon_at_jbrisbin.com>
Date: Mon, 27 Jun 2011 12:39:29 -0500 (CDT)

Ahh...yes. I'm actually using the snapshot version of the Riak java driver. You'll have to check that out separately and make sure it's installed in your local maven repo:

https://github.com/basho/riak-java-client/

The tests should fail on the last one (the throughput test).

I was troubleshooting it a little yesterday and it seems I'm not getting all the responses I'm expecting or something along those lines. I tried counting requests and responses and they don't match.

You'll also want to do a fresh git pull of my source to get the latest changes.

Thanks!

Jon Brisbin
http//jbrisbin.com

----- Original Message -----

> From: "Oleksiy Stashok" <oleksiy.stashok_at_oracle.com>
> To: users_at_grizzly.java.net
> Sent: Monday, June 27, 2011 11:48:15 AM
> Subject: Re: Do I need connection pooling? Or am I just doing it
> wrong?

> On 06/25/2011 02:41 PM, Jon Brisbin wrote:
> > I put a rewind in there. Was that not enough?
>
> That should be fine.

> Jon, can I ask you to provide some build instructions for the async
> client?

> I'm trying to build project using
> ./gradlew build

> but see a lot of package misses (see example bellow). Probably some
> of them are protobuf classes, but not sure if all.

> Thanks.

> WBR,
> Alexey.

> /home/oleksiys/Projects/riak-async-java-client/src/main/java/com/jbrisbin/riak/async/RiakAsyncClient.java:17:
> com.basho.riak.pbc.RiakMessageCodes is not public in
> com.basho.riak.pbc; cannot be accessed from outside package
> import static com.basho.riak.pbc.RiakMessageCodes.*;
> ^
> /home/oleksiys/Projects/riak-async-java-client/src/main/java/com/jbrisbin/riak/async/RiakAsyncClient.java:37:
> cannot find symbol
> symbol : class IRiakObject
> location: package com.basho.riak.client
> import com.basho.riak.client.IRiakObject;
> ^
> /home/oleksiys/Projects/riak-async-java-client/src/main/java/com/jbrisbin/riak/async/RiakAsyncClient.java:38:
> package com.basho.riak.client.bucket does not exist
> import com.basho.riak.client.bucket.BucketProperties;

> > Sent from my iPhone
>

> > On Jun 25, 2011, at 2:54 AM, Oleksiy Stashok <
> > oleksiy.stashok_at_oracle.com > wrote:
>

> > > Hi Jon,
> >
>

> > > Have you tried to fix the RpbFilter decoding (see my prev.
> > > email)?
> >
>

> > > WBR,
> >
>
> > > Alexey.
> >
>

> > > On 06/25/2011 01:27 AM, Jon Brisbin wrote:
> >
>
> > > > I updated the project to make the tests hang. To successfully
> > > > execute
> > > > them, you'd need a Riak installation locally on the default
> > > > port.
> > >
> >
>

> > > > It will hang on the future.get() call on line 122 because the
> > > > responses are getting out-of-sync with the requests.
> > >
> >
>

> > > > https://github.com/jbrisbin/riak-async-java-client/blob/master/src/test/groovy/com/jbrisbin/riak/async/RiakAsyncClientSpec.groovy
> > >
> >
>

> > > > Is there anything I should be doing on my end to do some
> > > > request/response accounting? I can't tell whether the problem
> > > > is
> > > > in
> > > > the client or in the server-side implementation of protobuf...
> > >
> >
>

> > > > Thanks for the help! :)
> > >
> >
>

> > > > Jon Brisbin
> > >
> >
>
> > > > http//jbrisbin.com
> > >
> >
>

> > > > ----- Original Message -----
> > >
> >
>

> > > > > From: "Oleksiy Stashok" < oleksiy.stashok_at_oracle.com >
> > > >
> > >
> >
>
> > > > > To: users_at_grizzly.java.net
> > > >
> > >
> >
>
> > > > > Sent: Friday, June 24, 2011 4:38:54 AM
> > > >
> > >
> >
>
> > > > > Subject: Re: Do I need connection pooling? Or am I just doing
> > > > > it
> > > > > wrong?
> > > >
> > >
> >
>

> > > > > Hi Jon,
> > > >
> > >
> >
>

> > > > > it would help if you can commit the test, which doesn't work
> > > > > :)
> > > >
> > >
> >
>

> > > > > Otherwise code looks fine, except here in RpbFilter
> > > >
> > >
> >
>

> > > > > @Override public NextAction handleRead ( FilterChainContext
> > > > > ctx
> > > > > )
> > > > > throws IOException {
> > > >
> > >
> >
>
> > > > > Buffer buffer = ctx . getMessage ();
> > > >
> > >
> >
>
> > > > > int size = buffer . getInt ();
> > > >
> > >
> >
>
> > > > > int code = buffer . get ();
> > > >
> > >
> >
>
> > > > > if ( buffer . remaining () < ( size - 1 )) {
> > > >
> > >
> >
>
> > > > > return ctx . getStopAction ( buffer );
> > > >
> > >
> >
>
> > > > > }
> > > >
> > >
> >
>
> > > > > You'd probably need to restore original buffer position,
> > > > > before
> > > > > returning ctx.getStopAction(buffer), otherwise you save a
> > > > > Buffer
> > > > > with a shifted position, so next time when you read size and
> > > > > code
> > > > > it
> > > > > will be corrupted.
> > > >
> > >
> >
>

> > > > > WBR,
> > > >
> > >
> >
>
> > > > > Alexey.
> > > >
> > >
> >
>

> > > > > On 06/23/2011 09:55 PM, Jon Brisbin wrote:
> > > >
> > >
> >
>
> > > > > > I'm trying to talk protocol buffers in a custom client I'm
> > > > > > writing
> > > > > > and I'm running into some kind of deadlock issue, I think.
> > > > > > I
> > > > > > could
> > > > > > also be exhausting the thread pool unawares (I really want
> > > > > > to
> > > > > > queue
> > > > > > this stuff somewhere...it's just not obvious to me how to
> > > > > > do
> > > > > > that).
> > > > > > I'm able to write and read from the server if I block until
> > > > > > the
> > > > > > response is fully processed (request written and response
> > > > > > processed). But if I try and write multiple requests on the
> > > > > > same
> > > > > > connection, something gets deadlocked somewhere and I have
> > > > > > no
> > > > > > idea
> > > > > > where.
> > > > >
> > > >
> > >
> >
>

> > > > > > I'm setting up my transport like this (workerPool is a
> > > > > > fixed
> > > > > > thread
> > > > > > pool at availableProcessors() * 4):
> > > > >
> > > >
> > >
> >
>

> > > > > > transport.setKeepAlive(true);
> > > > >
> > > >
> > >
> >
>
> > > > > > transport.setTcpNoDelay(true);
> > > > >
> > > >
> > >
> >
>
> > > > > > transport.setWorkerThreadPool(workerPool);
> > > > >
> > > >
> > >
> >
>
> > > > > > transport.setIOStrategy(WorkerThreadIOStrategy.getInstance());
> > > > >
> > > >
> > >
> >
>

> > > > > > My filters are:
> > > > >
> > > >
> > >
> >
>

> > > > > > clientChainBuilder.add(new TransportFilter());
> > > > >
> > > >
> > >
> >
>
> > > > > > clientChainBuilder.add(new RpbFilter(heap));
> > > > >
> > > >
> > >
> >
>
> > > > > > clientChainBuilder.add(new PendingRequestFilter());
> > > > >
> > > >
> > >
> >
>

> > > > > > RpbFilter is a protocol buffers encoder/decoder. The
> > > > > > PendingRequestFilter is part of my client and is supposed
> > > > > > to
> > > > > > call
> > > > > > the "saved" Future that was created for this request to
> > > > > > pass
> > > > > > back
> > > > > > the value. Here's an excerpt:
> > > > >
> > > >
> > >
> >
>

> > > > > > @Override public NextAction handleWrite(FilterChainContext
> > > > > > ctx)
> > > > > > throws IOException {
> > > > >
> > > >
> > >
> >
>
> > > > > > RpbRequest pending = ctx.getMessage();
> > > > >
> > > >
> > >
> >
>
> > > > > > pendingRequests.add(pending);
> > > > >
> > > >
> > >
> >
>
> > > > > > ctx.setMessage(pending.getMessage());
> > > > >
> > > >
> > >
> >
>
> > > > > > return ctx.getInvokeAction();
> > > > >
> > > >
> > >
> >
>
> > > > > > }
> > > > >
> > > >
> > >
> >
>

> > > > > > When a response comes back (which is, I think, where I'm
> > > > > > getting
> > > > > > messed up), I handle it like this (again, excerpted):
> > > > >
> > > >
> > >
> >
>

> > > > > > @Override public NextAction handleRead(final
> > > > > > FilterChainContext
> > > > > > ctx)
> > > > > > throws IOException {
> > > > >
> > > >
> > >
> >
>
> > > > > > final RpbMessage<?> msg = ctx.getMessage();
> > > > >
> > > >
> > >
> >
>
> > > > > > final RpbRequest pending = pendingRequests.peek();
> > > > >
> > > >
> > >
> >
>
> > > > > > if (null == pendingRequests.peek()) {
> > > > >
> > > >
> > >
> >
>
> > > > > > log.warn("Got a response when no pending requests
> > > > > > available!");
> > > > >
> > > >
> > >
> >
>
> > > > > > return ctx.getStopAction();
> > > > >
> > > >
> > >
> >
>
> > > > > > } else {
> > > > >
> > > >
> > >
> >
>
> > > > > > pendingRequests.remove(pending);
> > > > >
> > > >
> > >
> >
>
> > > > > > }
> > > > >
> > > >
> > >
> >
>
> > > > > > ...process the protobuf object and create an
> > > > > > application-specific
> > > > > > one
> > > > > > to pass to the client requestor...
> > > > >
> > > >
> > >
> >
>
> > > > > > if (null != response && null != pending.getCallback())
> > > > >
> > > >
> > >
> >
>
> > > > > > pending.getCallback().completed(response);
> > > > >
> > > >
> > >
> >
>

> > > > > > The project is here (tests pass now because I'm blocking):
> > > > >
> > > >
> > >
> >
>

> > > > > > https://github.com/jbrisbin/riak-async-java-client
> > > > >
> > > >
> > >
> >
>

> > > > > > Any help you could give me here would be greatly
> > > > > > appreciated!
> > > > > > :)
> > > > >
> > > >
> > >
> >
>

> > > > > > Thanks!
> > > > >
> > > >
> > >
> >
>

> > > > > > Jon Brisbin
> > > > >
> > > >
> > >
> >
>
> > > > > > http//jbrisbin.com
> > > > >
> > > >
> > >
> >
>