Hi Alexey,
Thank you very much for your help! Now everything works perfectly!
Best regards, Eugene.
2014-11-25 0:23 GMT+02:00 Oleksiy Stashok <oleksiy.stashok_at_oracle.com>:
> Hi Eugene,
>
> So, the qustions:
> 1) Is it possible to get events delivered to reader's CompletionHandler (I
> remember you advised to add a filter, but still)?
> And if anwer to question 1 is not, then
>
> No, unfortunately it won't work like that. As I said connection.read()
> will work for only very limited number of usecases.
>
> 2) How should HttpHandler be notified that DbProtocolFilter filter
> finished reading and response is ready to be produced. It also needs data
> from filter's FilterChainContext.message.
>
> Here is the sample, which should give you an idea.
> public class Test {
> private static final Attribute<Request> ASSOCIATED_HTTP_REQ_ATTR =
>
> Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");
>
> public static void main(String[] args) throws IOException {
>
> }
>
> private final TCPNIOTransport tcpTransport;
> private final SingleEndpointPool singleEndpointPool;
>
> public Test() {
> final FilterChain clientFilterChain =
> FilterChainBuilder.stateless()
> .add(new TransportFilter())
> .add(new DbProtocolFilter())
> .build();
>
> tcpTransport = TCPNIOTransportBuilder.newInstance().build();
>
> TCPNIOConnectorHandler connectorHandler =
> TCPNIOConnectorHandler.builder(tcpTransport)
> .processor(clientFilterChain)
> .build();
>
> singleEndpointPool = SingleEndpointPool //database connections
> .builder(SocketAddress.class)
> .connectorHandler(connectorHandler)
> .endpointAddress(new InetSocketAddress("localhost", 12345))
> .maxPoolSize(10)
> .build();
>
> }
>
> public void start() throws IOException {
> tcpTransport.start();
>
> HttpServer server = HttpServer.createSimpleServer(null,
> "localhost", 9090);
> server.getServerConfiguration().addHttpHandler(
> new JsonHttpHandler(singleEndpointPool),
> "/users");
> }
>
> class JsonHttpHandler extends HttpHandler {
>
> public void service(final Request request, final Response response)
> throws Exception {
>
> response.suspend();
>
> final NIOInputStream in = request.getNIOInputStream();
>
> in.notifyAvailable(new ReadHandler() {
>
> public void onDataAvailable() throws Exception {
> throw new UnsupportedOperationException("Not supported
> yet."); //To change body of generated methods, choose Tools | Templates.
> }
>
> public void onError(Throwable t) {
> throw new UnsupportedOperationException("Not supported
> yet."); //To change body of generated methods, choose Tools | Templates.
> }
>
> public void onAllDataRead() throws Exception {
> singleEndpointPool.take(
> new EmptyCompletionHandler<Connection>() {
>
> @Override
> public void completed(final Connection
> dbConnection) {
>
> ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the
> dbConnection with the HTTP Request
> dbConnection.write(dbRq);
> }
> });
> }
> });
> }
> }
>
> class DbProtocolFilter extends BaseFilter {
> @Override
> public NextAction handleRead(FilterChainContext ctx) throws
> IOException {
> final Connection c = ctx.getConnection();
> final Buffer dbResponseBuffer = ctx.getMessage(); // the
> message, could be just a part of response
>
> if (!isComplete(dbResponseBuffer)) {
> return ctx.getStopAction(dbResponseBuffer); // wait for
> more data to come
> }
>
> final Request associatedHttpRequest =
> ASSOCIATED_HTTP_REQ_ATTR.remove(c);
> assert associatedHttpRequest != null;
> final Response response = associatedHttpRequest.getResponse();
>
> try {
>
> processResponse(associatedHttpRequest, response,
> dbResponseBuffer);
> } finally {
> singleEndpointPool.release(c);
> response.resume();
> }
>
> return ctx.getStopAction();
> }
> }
> }
>
>
> Pls. let me know if you have further questions.
>
> WBR,
> Alexey.
>