Hey,
you can use either DelayQueue or standard ScheduledExecutorService to
track the DB request timeout. Once timeout is hit - just close the
connection.
In the JsonHandler you'll need to register a CloseListener to be
notified when connection is closed and resume HTTP processing
Like:
public void onAllDataRead() throws Exception {
singleEndpointPool.take(
new EmptyCompletionHandler<Connection>() {
@Override
public void completed(final Connection
dbConnection) {
ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the
dbConnection with the HTTP Request
startTrackDbConnectionTimeout(dbConnection);
dbConnection.addCloseListener(...); // in the CloseListener we need to
resume the HTTP request/response processing
dbConnection.write(dbRq);
}
});
}
Please don't forget to clean up the CloseListener and other possible
handlers and listeners before returning Connection back to the pool.
WBR,
Alexey.
On 6/7/16 11:50 PM, Евгений Бушуев wrote:
> Hi Alexey,
>
> Could you please advise what is the best way to check for the read
> timeout in DbFilter (please see reply history for the contex)? That is
> whendbConnection.write(dbRq) was exectued in JsonHttpHandler but
> DbProtocolFilter.handleRead doesn't get complete dbResponseBuffer
> within given time. I'm thinking about replacing
> ASSOCIATED_HTTP_REQ_ATTR with DelayQueue similar to how it's done for
> keepAliveCleaner in SingleEndpointPool. Would it be a good idea?
>
> Best regards, Eugene.
>
>
>
> 2014-11-25 21:20 GMT+01:00 Евгений Бушуев <yevgen.bushuyev_at_gmail.com
> <mailto:yevgen.bushuyev_at_gmail.com>>:
>
> Hi Alexey,
>
> Thank you very much for your help! Now everything works perfectly!
>
> Best regards, Eugene.
>
> 2014-11-25 0:23 GMT+02:00 Oleksiy Stashok
> <oleksiy.stashok_at_oracle.com <mailto:oleksiy.stashok_at_oracle.com>>:
>
> Hi Eugene,
>
>> So, the qustions:
>> 1) Is it possible to get events delivered to reader's
>> CompletionHandler (I remember you advised to add a filter,
>> but still)?
>> And if anwer to question 1 is not, then
> No, unfortunately it won't work like that. As I said
> connection.read() will work for only very limited number of
> usecases.
>
>> 2) How should HttpHandler be notified that
>> DbProtocolFilter filter finished reading and response is
>> ready to be produced. It also needs data from
>> filter's FilterChainContext.message.
> Here is the sample, which should give you an idea.
> public class Test {
> private static final Attribute<Request>
> ASSOCIATED_HTTP_REQ_ATTR =
> Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");
>
> public static void main(String[] args) throws IOException {
>
> }
>
> private final TCPNIOTransport tcpTransport;
> private final SingleEndpointPool singleEndpointPool;
>
> public Test() {
> final FilterChain clientFilterChain =
> FilterChainBuilder.stateless()
> .add(new TransportFilter())
> .add(new DbProtocolFilter())
> .build();
>
> tcpTransport =
> TCPNIOTransportBuilder.newInstance().build();
>
> TCPNIOConnectorHandler connectorHandler =
> TCPNIOConnectorHandler.builder(tcpTransport)
> .processor(clientFilterChain)
> .build();
>
> singleEndpointPool = SingleEndpointPool //database
> connections
> .builder(SocketAddress.class)
> .connectorHandler(connectorHandler)
> .endpointAddress(new
> InetSocketAddress("localhost", 12345))
> .maxPoolSize(10)
> .build();
>
> }
>
> public void start() throws IOException {
> tcpTransport.start();
>
> HttpServer server =
> HttpServer.createSimpleServer(null, "localhost", 9090);
> server.getServerConfiguration().addHttpHandler(
> new JsonHttpHandler(singleEndpointPool),
> "/users");
> }
>
> class JsonHttpHandler extends HttpHandler {
>
> public void service(final Request request, final
> Response response)
> throws Exception {
>
> response.suspend();
>
> final NIOInputStream in = request.getNIOInputStream();
>
> in.notifyAvailable(new ReadHandler() {
>
> public void onDataAvailable() throws Exception {
> throw new
> UnsupportedOperationException("Not supported yet."); //To
> change body of generated methods, choose Tools | Templates.
> }
>
> public void onError(Throwable t) {
> throw new
> UnsupportedOperationException("Not supported yet."); //To
> change body of generated methods, choose Tools | Templates.
> }
>
> public void onAllDataRead() throws Exception {
> singleEndpointPool.take(
> new
> EmptyCompletionHandler<Connection>() {
>
> @Override
> public void completed(final
> Connection dbConnection) {
> ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); //
> associated the dbConnection with the HTTP Request
> dbConnection.write(dbRq);
> }
> });
> }
> });
> }
> }
>
> class DbProtocolFilter extends BaseFilter {
> @Override
> public NextAction handleRead(FilterChainContext ctx)
> throws IOException {
> final Connection c = ctx.getConnection();
> final Buffer dbResponseBuffer = ctx.getMessage();
> // the message, could be just a part of response
>
> if (!isComplete(dbResponseBuffer)) {
> return ctx.getStopAction(dbResponseBuffer); //
> wait for more data to come
> }
>
> final Request associatedHttpRequest =
> ASSOCIATED_HTTP_REQ_ATTR.remove(c);
> assert associatedHttpRequest != null;
> final Response response =
> associatedHttpRequest.getResponse();
>
> try {
>
> processResponse(associatedHttpRequest, response,
> dbResponseBuffer);
> } finally {
> singleEndpointPool.release(c);
> response.resume();
> }
>
> return ctx.getStopAction();
> }
> }
> }
>
>
> Pls. let me know if you have further questions.
>
> WBR,
> Alexey.
>
>
>