Hi Pavel,
thanks, please see comments inline
I'll try to provide more feedback during the next few days too
thanks, Sergey
On 08/03/17 15:58, Pavel Bucek wrote:
Hi Sergey,
thanks for your feedback!
please see inline.
On 08/03/2017 13:56, Sergey Beryozkin wrote:
Hi Pavel
Q2:
1. Possible helper method on AsyncResponse
I'd say yes lets make it as implicit as possible, referring here to your comment that AsyncResponse helper method may be added.
How would the code look like instead?
We discussed this during a syncup meeting with Santiago and .. if we'll do "jax-rs" Publisher, we could add some convenience methods there. For the simples example, which is "consume everyting, send ok response when done and propagate exception", it could look like:
public void ex2(JaxRsPublisher<POJO> entity,
@Suspended AsyncResponse response) {
entity.subscribe(response, item -> { });
// ...}
this would most likely require AsyncResponse to implement some interface, but that can be easily done. It is on my todo list for next few days, I'll share the update once we have something meaningful.
Sounds good
2. @Suspended should not be enforced - would that conflict with the idea of having an AsyncResponse helper method ?
Most likely yes. @Suspended should be always there, since JAX-RS somewhere mentions that only resource method parameter without an annotation is an entity. How would the implementation recognize that AsyncResponse is not an entity? Also, it seem to make more sense to pass the response to some process, which is producing the (request) entity - compare that to passing reuqest entity publisher to async response.
I'm not sure if we are disagreeing here :-). As far as I understand, that yes, @Suspended will stay if we have a style where AsyncResponse is explicitly provided.
But you also said, "Note that using @Suspended shouldn't be enforced here; it should be possible to return a Response directly and still be able to consume the request entity. ". May be you can clarify this statement with the code, did you mean:
@POST
@Path("/ex2")
@Consumes(MediaType.APPLICATION_JSON)
public Response ex2(Flow.Publisher<POJO> entity) {
?
3. NioBodyReader.isReadable type parameter - I'm not sure yet either; I also wonder about the mapping processors, for example, given
// ex1 processor
public static class Ex1Processor implements Flow.Processor<ByteBuffer, ByteBuffer> {
// ...
}
and
// mapping Publisher<ByteBuffer> to Publisher<POJO>
// ByteBuffers are expected to contain JSON (indicated by @Consumes on the resource method and NioBodyReader).
public static class Ex2MappingProcessor implements Flow.Subscriber<ByteBuffer>, Flow.Publisher<POJO> {
// ...
}
why can't we have
// mapping Publisher<ByteBuffer> to Publisher<POJO>
// ByteBuffers are expected to contain JSON (indicated by @Consumes on the resource method and NioBodyReader).
public static class Ex2MappingProcessor implements Flow.Processor<ByteBuffer, POJO> {
// ...
}
There's obviously a good reason for it but would like to understand the diiferences
I'm sorry to disappoint, but there is no good reason :) I guess I wanted to not introduce dependency on the Processor when not needed, but currently it is used in the Reader/WriterInterceptorContext and Container(Request|Response)Context, so I could write it as you did. Thanks, Pavel
OK, I guess in a couple of weeks may be you can do m06, after the initial round is more or less complete, so that we can start playing with the API, would be easier to spot what works really well, what might be improved
Thanks, Sergey
Thanks, Sergey On 07/03/17 14:25, Pavel Bucek wrote:
Dear EG members,
please allow me to share the direction of what we are thinking about JAX-RS NIO support.
As stated before, SSE wasn't the only place were Flow APIs should be utilized - NIO is another area where it can be utilized quite heavily. And that was mostly the main idea - to see where it does make sense to use that.
Similarly to SSE, there is a plan to minimize / extract the Flow to different classes, but there should always be a clear path how to convert an instance into Flow.* (or org.reactivestreams.*) interfaces. It is not done yet, to keep things as clear as possible.
One of the bigger concerns is backwards compatibility. There are interfaces, which do work directly with Input/Output stream, which is always a problem for reactive processing, since it is blocking by design. The specification will need to say something like "The runtime is non-blocking as long as the application code doesn't read from a provided InputStream or doesn't set an OutputStream."
The motivation for doing this is to allow the JAX-RS apps to be non-blocking and reactive. JAX-RS 2.0 entity handling is designed around reading / producing Input / Output Stream, which is blocking by design. Non-blocking approach should result in higher throughput and better resource utilization of the server. Also, integration and coexistence with modern reactive frameworks should be possible to do without losing the advantage of having that framework (which was almost completely lost when dealing with blocking inputs/outputs).
Let's jump into code snippets.
Server - EX1 (byte handling):
Snippet below shows how to process request entity body return response entity body, using Publisher<ByteBuffer> - no MessageBodyReader/Writer is involved. This can be used as a low-level integration point for other frameworks, which are also reactive and will do the processing, like serializing/deserializing (mapping) to some java type, filtering, etc. Returning a Publisher<ByteBuffer> is a reactive/nio alternative to javax.ws.rs.core.StreamingOutput, consuming an entity using Publisher<ByteBuffer> is a reactive/nio alternative to consuming entity as an InputStream.
@POST
@Path("/ex1")
public Flow.Publisher<ByteBuffer> ex1(Flow.Publisher<ByteBuffer> entity) {
Ex1Processor processor = new Ex1Processor();
entity.subscribe(processor);
return processor;
}
// ex1 processor
public static class Ex1Processor implements Flow.Processor<ByteBuffer, ByteBuffer> {
// ...
}
And there is already an issue, which is not clearly solved.
Returning a Publisher instance from the resource method does put some constraints on the Publisher itself - it needs to cache all events which will be emitted prior subscription of the jax-rs implementation Subscriber instance (which is the only way how to get the data from a Publisher).
This can be solved by stating that request entity publisher won't produce any events until the resource method is invoked and the implementation Subscriber subscribed to the returned response entity publisher. Or the resource method can return Consumer<Flow.Subscriber<ByteBuffer>>, which would effectively grant control of the implementation subscription process. Any comments or suggestions welcomed [ref Q1].
Server - EX2 (consuming stream of pojos):
The next example should be slightly more straightforward. It shows how to process a Publisher of custom type, in this case called "POJO".
The only limitation or rule here would be that the subscriber for the request entity must be subscribed before the resource method "ex2" invocation ends.
New interface is introduced here - NioBodyReader. It has exactly the same responsibility as good old MessageBodyReader, but without using a blocking OutputStream to write the entity. Note that the "core" type is the Publisher<ByteBuffer>, which is in this case mapped (or converted) into Publisher of POJOs.
@POST
@Path("/ex2")
@Consumes(MediaType.APPLICATION_JSON)
public void ex2(Flow.Publisher<POJO> entity,
@Suspended AsyncResponse response) {
// TODO: introduce a helper or modify AsyncResponse to support this pattern directly?
entity.subscribe(
// POJO subscriber - consumer
new Flow.Subscriber<POJO>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
// ...
}
@Override
public void onNext(POJO item) {
// ...
}
@Override
public void onError(Throwable throwable) {
response.resume(throwable);
}
@Override
public void onComplete() {
response.resume(Response.ok().build());
}
}
);
}
@Provider
@Consumes(MediaType.APPLICATION_JSON)
public static class Ex2NioBodyReader implements NioBodyReader<POJO> {
@Override
public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
return true;
}
@Override
public Flow.Publisher<POJO> readFrom(Flow.Publisher<ByteBuffer> entity,
Class<POJO> type,
Type genericType,
Annotation[] annotations,
MediaType mediaType,
MultivaluedMap<String, String> httpHeaders) {
Ex2MappingProcessor mappingProcessor = new Ex2MappingProcessor();
entity.subscribe(mappingProcessor);
return mappingProcessor;
}
}
// mapping Publisher<ByteBuffer> to Publisher<POJO>
// ByteBuffers are expected to contain JSON (indicated by @Consumes on the resource method and NioBodyReader).
public static class Ex2MappingProcessor implements Flow.Subscriber<ByteBuffer>, Flow.Publisher<POJO> {
// ...
}
Same issue as [Q1] is valid for this as well - same solution will need to be applied for "readFrom" method.
Another issue is about what should be passed to "isReadable" method as "type" parameter. I'm not exactly sure whether we can safely obtain generic type of a parameter from the resource method (public void ex2(Flow.Publisher< POJO> entity, ..)). Any comments/suggestions welcomed [ref Q2].
Note that using @Suspended shouldn't be enforced here; it should be possible to return a Response directly and still be able to consume the request entity.
Server - EX3 (producing list of POJOs):
The last (for now) example shows how we can produce and write POJOs. Resource method doesn't take any parameters and provides a Publisher of POJO objects, which will be converted to JSON in NioBodyWriter. NioBodyReader is a reactive alternative to MessageBodyReader from older version of the specification.
@GET
@Path("/ex3")
@Produces(MediaType.APPLICATION_JSON)
public Flow.Publisher<POJO> ex3() {
Flow.Publisher<POJO> pojoPublisher = null;
// source of the POJO "stream" can be anything - database call, client call to
// another service, ...
//
// DB
// .getEmployees(department) // StreamPublisher<EmployeeDbModel> -- reactive stream
// .map((Function<EmployeeDbModel, EmployeeToReturn>) employeeDbModel -> {
// // ...
// });
return pojoPublisher;
}
@Provider
@Produces(MediaType.APPLICATION_JSON)
public static class Ex3NioBodyWriter implements NioBodyWriter<POJO> {
@Override
public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
return true;
}
@Override
public void writeTo(Flow.Publisher<POJO> entityObjectPublisher,
Flow.Subscriber<ByteBuffer> subscriber,
Class<?> type,
Type genericType,
Annotation[] annotations,
MediaType mediaType,
MultivaluedMap<String, Object> httpHeaders) {
// map Publisher<POJO> to Publisher<ByteBuffer> and subscribe Flow.Subscriber<ByteBuffer> to it.
}
}
Resource method is minimalistic, [Q1] applies here as well.
The example introduces NioBodyWriter and its isWriteable method does have [Q2], similarly to NioBodyReader. #writeTo doesn't have any issues - [Q1] is mitigated there because the implementation passes a supplier to the implementation - there doesn't need to be anything returned. Something similar might be able to do for NioBodyWriter as well.
Comment to writing multiple POJO instances:
https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/jaxrs-api/src/main/java/javax/ws/rs/ext/NioBodyWriter.java#L82<https://github.com/pavelbucek/jax-rs/blob/8304c658c923b520ed34201b1f1e6741660daa0b/jaxrs-api/src/main/java/javax/ws/rs/ext/NioBodyWriter.java#L82>
Important point to mention is that even when producing multiple instances, the intention here is still to return the single HTTP response.
===
We have more, but this email is already too long - I will post more after there is some feedback on the presented concepts and issues. Please let us know if this format is OK or if you'd prefer something else - I guess I could do a screencast, hangout or something similar.
Source links:
- complete server example:
https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioResource.java<https://github.com/pavelbucek/jax-rs/blob/8304c658c923b520ed34201b1f1e6741660daa0b/examples/src/main/java/jaxrs/examples/nio/NioResource.java> - client (to be discussed):
https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioClient.java <
https://github.com/pavelbucek/jax-rs/blob/8304c658c923b520ed34201b1f1e6741660daa0b/examples/src/main/java/jaxrs/examples/nio/NioClient.java> - server side processing (including interceptors):
https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/ServerSideProcessing.java
(Not including direct link to individual examples, since we will continue working on them...)
Looking forward to your feedback!
Thanks and regards, Pavel & Santiago