Hi Alexey,
Could you please advise what is the best way to check for the read timeout
in DbFilter (please see reply history for the contex)? That is when
dbConnection.write(dbRq)
was exectued in JsonHttpHandler but DbProtocolFilter.handleRead doesn't
get complete dbResponseBuffer within given time. I'm thinking about
replacing ASSOCIATED_HTTP_REQ_ATTR with DelayQueue similar to how it's done
for keepAliveCleaner in SingleEndpointPool. Would it be a good idea?
Best regards, Eugene.
2014-11-25 21:20 GMT+01:00 Евгений Бушуев <yevgen.bushuyev_at_gmail.com>:
> Hi Alexey,
>
> Thank you very much for your help! Now everything works perfectly!
>
> Best regards, Eugene.
>
> 2014-11-25 0:23 GMT+02:00 Oleksiy Stashok <oleksiy.stashok_at_oracle.com>:
>
>> Hi Eugene,
>>
>> So, the qustions:
>> 1) Is it possible to get events delivered to reader's CompletionHandler
>> (I remember you advised to add a filter, but still)?
>> And if anwer to question 1 is not, then
>>
>> No, unfortunately it won't work like that. As I said connection.read()
>> will work for only very limited number of usecases.
>>
>> 2) How should HttpHandler be notified that DbProtocolFilter filter
>> finished reading and response is ready to be produced. It also needs data
>> from filter's FilterChainContext.message.
>>
>> Here is the sample, which should give you an idea.
>> public class Test {
>> private static final Attribute<Request> ASSOCIATED_HTTP_REQ_ATTR =
>>
>> Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");
>>
>> public static void main(String[] args) throws IOException {
>>
>> }
>>
>> private final TCPNIOTransport tcpTransport;
>> private final SingleEndpointPool singleEndpointPool;
>>
>> public Test() {
>> final FilterChain clientFilterChain =
>> FilterChainBuilder.stateless()
>> .add(new TransportFilter())
>> .add(new DbProtocolFilter())
>> .build();
>>
>> tcpTransport = TCPNIOTransportBuilder.newInstance().build();
>>
>> TCPNIOConnectorHandler connectorHandler =
>> TCPNIOConnectorHandler.builder(tcpTransport)
>> .processor(clientFilterChain)
>> .build();
>>
>> singleEndpointPool = SingleEndpointPool //database connections
>> .builder(SocketAddress.class)
>> .connectorHandler(connectorHandler)
>> .endpointAddress(new InetSocketAddress("localhost",
>> 12345))
>> .maxPoolSize(10)
>> .build();
>>
>> }
>>
>> public void start() throws IOException {
>> tcpTransport.start();
>>
>> HttpServer server = HttpServer.createSimpleServer(null,
>> "localhost", 9090);
>> server.getServerConfiguration().addHttpHandler(
>> new JsonHttpHandler(singleEndpointPool),
>> "/users");
>> }
>>
>> class JsonHttpHandler extends HttpHandler {
>>
>> public void service(final Request request, final Response
>> response)
>> throws Exception {
>>
>> response.suspend();
>>
>> final NIOInputStream in = request.getNIOInputStream();
>>
>> in.notifyAvailable(new ReadHandler() {
>>
>> public void onDataAvailable() throws Exception {
>> throw new UnsupportedOperationException("Not
>> supported yet."); //To change body of generated methods, choose Tools |
>> Templates.
>> }
>>
>> public void onError(Throwable t) {
>> throw new UnsupportedOperationException("Not
>> supported yet."); //To change body of generated methods, choose Tools |
>> Templates.
>> }
>>
>> public void onAllDataRead() throws Exception {
>> singleEndpointPool.take(
>> new EmptyCompletionHandler<Connection>() {
>>
>> @Override
>> public void completed(final Connection
>> dbConnection) {
>>
>> ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the
>> dbConnection with the HTTP Request
>> dbConnection.write(dbRq);
>> }
>> });
>> }
>> });
>> }
>> }
>>
>> class DbProtocolFilter extends BaseFilter {
>> @Override
>> public NextAction handleRead(FilterChainContext ctx) throws
>> IOException {
>> final Connection c = ctx.getConnection();
>> final Buffer dbResponseBuffer = ctx.getMessage(); // the
>> message, could be just a part of response
>>
>> if (!isComplete(dbResponseBuffer)) {
>> return ctx.getStopAction(dbResponseBuffer); // wait for
>> more data to come
>> }
>>
>> final Request associatedHttpRequest =
>> ASSOCIATED_HTTP_REQ_ATTR.remove(c);
>> assert associatedHttpRequest != null;
>> final Response response = associatedHttpRequest.getResponse();
>>
>> try {
>>
>> processResponse(associatedHttpRequest, response,
>> dbResponseBuffer);
>> } finally {
>> singleEndpointPool.release(c);
>> response.resume();
>> }
>>
>> return ctx.getStopAction();
>> }
>> }
>> }
>>
>>
>> Pls. let me know if you have further questions.
>>
>> WBR,
>> Alexey.
>>
>
>