users@grizzly.java.net

Re: Read doesn't work

From: Oleksiy Stashok <oleksiy.stashok_at_oracle.com>
Date: Mon, 24 Nov 2014 14:23:00 -0800

Hi Eugene,

> So, the qustions:
> 1) Is it possible to get events delivered to reader's
> CompletionHandler (I remember you advised to add a filter, but still)?
> And if anwer to question 1 is not, then
No, unfortunately it won't work like that. As I said connection.read()
will work for only very limited number of usecases.

> 2) How should HttpHandler be notified that DbProtocolFilter filter
> finished reading and response is ready to be produced. It also needs
> data from filter's FilterChainContext.message.
Here is the sample, which should give you an idea.
public class Test {
     private static final Attribute<Request> ASSOCIATED_HTTP_REQ_ATTR =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");

     public static void main(String[] args) throws IOException {

     }

     private final TCPNIOTransport tcpTransport;
     private final SingleEndpointPool singleEndpointPool;

     public Test() {
         final FilterChain clientFilterChain =
FilterChainBuilder.stateless()
                 .add(new TransportFilter())
                 .add(new DbProtocolFilter())
                 .build();

         tcpTransport = TCPNIOTransportBuilder.newInstance().build();

         TCPNIOConnectorHandler connectorHandler =
TCPNIOConnectorHandler.builder(tcpTransport)
                 .processor(clientFilterChain)
                 .build();

         singleEndpointPool = SingleEndpointPool //database connections
                 .builder(SocketAddress.class)
                 .connectorHandler(connectorHandler)
                 .endpointAddress(new InetSocketAddress("localhost", 12345))
                 .maxPoolSize(10)
                 .build();

     }

     public void start() throws IOException {
         tcpTransport.start();

         HttpServer server = HttpServer.createSimpleServer(null,
"localhost", 9090);
         server.getServerConfiguration().addHttpHandler(
                 new JsonHttpHandler(singleEndpointPool),
                 "/users");
     }

     class JsonHttpHandler extends HttpHandler {

         public void service(final Request request, final Response response)
                 throws Exception {

             response.suspend();

             final NIOInputStream in = request.getNIOInputStream();

             in.notifyAvailable(new ReadHandler() {

                 public void onDataAvailable() throws Exception {
                     throw new UnsupportedOperationException("Not
supported yet."); //To change body of generated methods, choose Tools |
Templates.
                 }

                 public void onError(Throwable t) {
                     throw new UnsupportedOperationException("Not
supported yet."); //To change body of generated methods, choose Tools |
Templates.
                 }

                 public void onAllDataRead() throws Exception {
                     singleEndpointPool.take(
                             new EmptyCompletionHandler<Connection>() {

                                 @Override
                                 public void completed(final Connection
dbConnection) {
ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the
dbConnection with the HTTP Request
                                     dbConnection.write(dbRq);
                                 }
                             });
                 }
             });
         }
     }

     class DbProtocolFilter extends BaseFilter {
         @Override
         public NextAction handleRead(FilterChainContext ctx) throws
IOException {
             final Connection c = ctx.getConnection();
             final Buffer dbResponseBuffer = ctx.getMessage(); // the
message, could be just a part of response

             if (!isComplete(dbResponseBuffer)) {
                 return ctx.getStopAction(dbResponseBuffer); // wait for
more data to come
             }

             final Request associatedHttpRequest =
ASSOCIATED_HTTP_REQ_ATTR.remove(c);
             assert associatedHttpRequest != null;
             final Response response = associatedHttpRequest.getResponse();

             try {

                 processResponse(associatedHttpRequest, response,
dbResponseBuffer);
             } finally {
                 singleEndpointPool.release(c);
                 response.resume();
             }

             return ctx.getStopAction();
         }
     }
}


Pls. let me know if you have further questions.

WBR,
Alexey.