Hi Eugene,
> So, the qustions:
> 1) Is it possible to get events delivered to reader's
> CompletionHandler (I remember you advised to add a filter, but still)?
> And if anwer to question 1 is not, then
No, unfortunately it won't work like that. As I said connection.read()
will work for only very limited number of usecases.
> 2) How should HttpHandler be notified that DbProtocolFilter filter
> finished reading and response is ready to be produced. It also needs
> data from filter's FilterChainContext.message.
Here is the sample, which should give you an idea.
public class Test {
private static final Attribute<Request> ASSOCIATED_HTTP_REQ_ATTR =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");
public static void main(String[] args) throws IOException {
}
private final TCPNIOTransport tcpTransport;
private final SingleEndpointPool singleEndpointPool;
public Test() {
final FilterChain clientFilterChain =
FilterChainBuilder.stateless()
.add(new TransportFilter())
.add(new DbProtocolFilter())
.build();
tcpTransport = TCPNIOTransportBuilder.newInstance().build();
TCPNIOConnectorHandler connectorHandler =
TCPNIOConnectorHandler.builder(tcpTransport)
.processor(clientFilterChain)
.build();
singleEndpointPool = SingleEndpointPool //database connections
.builder(SocketAddress.class)
.connectorHandler(connectorHandler)
.endpointAddress(new InetSocketAddress("localhost", 12345))
.maxPoolSize(10)
.build();
}
public void start() throws IOException {
tcpTransport.start();
HttpServer server = HttpServer.createSimpleServer(null,
"localhost", 9090);
server.getServerConfiguration().addHttpHandler(
new JsonHttpHandler(singleEndpointPool),
"/users");
}
class JsonHttpHandler extends HttpHandler {
public void service(final Request request, final Response response)
throws Exception {
response.suspend();
final NIOInputStream in = request.getNIOInputStream();
in.notifyAvailable(new ReadHandler() {
public void onDataAvailable() throws Exception {
throw new UnsupportedOperationException("Not
supported yet."); //To change body of generated methods, choose Tools |
Templates.
}
public void onError(Throwable t) {
throw new UnsupportedOperationException("Not
supported yet."); //To change body of generated methods, choose Tools |
Templates.
}
public void onAllDataRead() throws Exception {
singleEndpointPool.take(
new EmptyCompletionHandler<Connection>() {
@Override
public void completed(final Connection
dbConnection) {
ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the
dbConnection with the HTTP Request
dbConnection.write(dbRq);
}
});
}
});
}
}
class DbProtocolFilter extends BaseFilter {
@Override
public NextAction handleRead(FilterChainContext ctx) throws
IOException {
final Connection c = ctx.getConnection();
final Buffer dbResponseBuffer = ctx.getMessage(); // the
message, could be just a part of response
if (!isComplete(dbResponseBuffer)) {
return ctx.getStopAction(dbResponseBuffer); // wait for
more data to come
}
final Request associatedHttpRequest =
ASSOCIATED_HTTP_REQ_ATTR.remove(c);
assert associatedHttpRequest != null;
final Response response = associatedHttpRequest.getResponse();
try {
processResponse(associatedHttpRequest, response,
dbResponseBuffer);
} finally {
singleEndpointPool.release(c);
response.resume();
}
return ctx.getStopAction();
}
}
}
Pls. let me know if you have further questions.
WBR,
Alexey.