Dear EG members,
please allow me to share the direction of what we are thinking about
JAX-RS NIO support.
As stated before, SSE wasn't the only place were Flow APIs should be
utilized - NIO is another area where it can be utilized quite heavily.
And that was mostly the main idea - to see where it does make sense to
use that.
Similarly to SSE, there is a plan to minimize / extract the Flow to
different classes, but there should always be a clear path how to
convert an instance into Flow.* (or org.reactivestreams.*) interfaces.
It is not done yet, to keep things as clear as possible.
One of the bigger concerns is backwards compatibility. There are
interfaces, which do work directly with Input/Output stream, which is
always a problem for reactive processing, since it is blocking by
design. The specification will need to say something like "The runtime
is non-blocking as long as the application code doesn't read from a
provided InputStream or doesn't set an OutputStream."
The motivation for doing this is to allow the JAX-RS apps to be
non-blocking and reactive. JAX-RS 2.0 entity handling is designed around
reading / producing Input / Output Stream, which is blocking by design.
Non-blocking approach should result in higher throughput and better
resource utilization of the server. Also, integration and coexistence
with modern reactive frameworks should be possible to do without losing
the advantage of having that framework (which was almost completely lost
when dealing with blocking inputs/outputs).
Let's jump into code snippets.
*Server - EX1 (byte handling):*
Snippet below shows how to process request entity body return response
entity body, using Publisher<ByteBuffer> - no MessageBodyReader/Writer
is involved. This can be used as a low-level integration point for other
frameworks, which are also reactive and will do the processing, like
serializing/deserializing (mapping) to some java type, filtering, etc.
Returning a Publisher<ByteBuffer> is a reactive/nio alternative to
javax.ws.rs.core.StreamingOutput, consuming an entity using
Publisher<ByteBuffer> is a reactive/nio alternative to consuming entity
as an InputStream.
@POST @Path("/ex1")
public Flow.Publisher<ByteBuffer> ex1(Flow.Publisher<ByteBuffer> entity) {
Ex1Processor processor =new Ex1Processor();
entity.subscribe(processor);
return processor;
}
// ex1 processor public static class Ex1Processorimplements Flow.Processor<ByteBuffer, ByteBuffer> {
// ... }
And there is already an issue, which is not clearly solved.
Returning a Publisher instance from the resource method does put some
constraints on the Publisher itself - it needs to cache all events which
will be emitted prior subscription of the jax-rs implementation
Subscriber instance (which is the only way how to get the data from a
Publisher).
This can be solved by stating that *request* entity publisher won't
produce any events until the resource method is invoked and the
implementation Subscriber subscribed to the returned *response* entity
publisher. Or the resource method can return
Consumer<Flow.Subscriber<ByteBuffer>>, which would effectively grant
control of the implementation subscription process. Any comments or
suggestions welcomed *[ref Q1]*.
*Server - EX2 (consuming stream of pojos):*
The next example should be slightly more straightforward. It shows how
to process a Publisher of custom type, in this case called "POJO".
The only limitation or rule here would be that the subscriber for the
request entity must be subscribed before the resource method "ex2"
invocation ends.
New interface is introduced here - NioBodyReader. It has exactly the
same responsibility as good old MessageBodyReader, but without using a
blocking OutputStream to write the entity. Note that the "core" type is
the Publisher<ByteBuffer>, which is in this case mapped (or converted)
into Publisher of POJOs.
@POST @Path("/ex2")
@Consumes(MediaType.APPLICATION_JSON)public void ex2(Flow.Publisher<POJO> entity,
@Suspended AsyncResponse response) {
// TODO: introduce a helper or modify AsyncResponse to support this
pattern directly? entity.subscribe(
// POJO subscriber - consumer new Flow.Subscriber<POJO>() {
@Override public void onSubscribe(Flow.Subscription subscription) {
// ... }
@Override public void onNext(POJO item) {
// ... }
@Override public void onError(Throwable throwable) {
response.resume(throwable);
}
@Override public void onComplete() {
response.resume(Response.ok().build());
}
}
);
}
@Provider
@Consumes(MediaType.APPLICATION_JSON)
public static class Ex2NioBodyReaderimplements NioBodyReader<POJO> {
@Override public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
return true;
}
@Override public Flow.Publisher<POJO> readFrom(Flow.Publisher<ByteBuffer> entity,
Class<POJO> type,
Type genericType,
Annotation[] annotations,
MediaType mediaType,
MultivaluedMap<String, String> httpHeaders) {
Ex2MappingProcessor mappingProcessor =new Ex2MappingProcessor();
entity.subscribe(mappingProcessor);
return mappingProcessor;
}
}
// mapping Publisher<ByteBuffer> to Publisher<POJO> // ByteBuffers are
expected to contain JSON (indicated by @Consumes on the resource method
and NioBodyReader). public static class Ex2MappingProcessorimplements Flow.Subscriber<ByteBuffer>, Flow.Publisher<POJO> {
// ... }
Same issue as *[Q1]* is valid for this as well - same solution will need
to be applied for "readFrom" method.
Another issue is about what should be passed to "isReadable" method as
"type" parameter. I'm not exactly sure whether we can safely obtain
generic type of a parameter from the resource method (public void
ex2(Flow.Publisher< *POJO*> entity, ..)). Any comments/suggestions
welcomed *[ref Q2]*.
Note that using @Suspended shouldn't be enforced here; it should be
possible to return a Response directly and still be able to consume the
*request* entity.
*Server - EX3 (producing list of POJOs):*
The last (for now) example shows how we can produce and write POJOs.
Resource method doesn't take any parameters and provides a Publisher of
POJO objects, which will be converted to JSON in NioBodyWriter.
NioBodyReader is a reactive alternative to MessageBodyReader from older
version of the specification.
@GET @Path("/ex3")@Produces(MediaType.APPLICATION_JSON)public Flow.Publisher<POJO> ex3() {
Flow.Publisher<POJO> pojoPublisher =null;
// source of the POJO "stream" can be anything - database call, client
call to // another service, ... // // DB // .getEmployees(department) //
StreamPublisher<EmployeeDbModel> -- reactive stream //
.map((Function<EmployeeDbModel, EmployeeToReturn>) employeeDbModel -> {
// // ... // }); return pojoPublisher;
}
@Provider @Produces(MediaType.APPLICATION_JSON)
public static class Ex3NioBodyWriterimplements NioBodyWriter<POJO> {
@Override public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
return true;
}
@Override public void writeTo(Flow.Publisher<POJO> entityObjectPublisher,
Flow.Subscriber<ByteBuffer> subscriber,
Class<?> type,
Type genericType,
Annotation[] annotations,
MediaType mediaType,
MultivaluedMap<String, Object> httpHeaders) {
// map Publisher<POJO> to Publisher<ByteBuffer> and subscribe
Flow.Subscriber<ByteBuffer> to it. }
}
Resource method is minimalistic, *[Q1]* applies here as well.
The example introduces NioBodyWriter and its isWriteable method does
have *[Q2]*, similarly to NioBodyReader. #writeTo doesn't have any
issues - *[Q1]* is mitigated there because the implementation passes a
supplier to the implementation - there doesn't need to be anything
returned. Something similar might be able to do for NioBodyWriter as well.
Comment to writing multiple POJO instances:
https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/jaxrs-api/src/main/java/javax/ws/rs/ext/NioBodyWriter.java#L82
Important point to mention is that even when producing multiple
instances, the intention here is still to return the single HTTP response.
===
We have more, but this email is already too long - I will post more
after there is some feedback on the presented concepts and issues.
Please let us know if this format is OK or if you'd prefer something
else - I guess I could do a screencast, hangout or something similar.
Source links:
- complete server example:
https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioResource.java
- client (to be discussed):
https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioClient.java
- server side processing (including interceptors):
https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/ServerSideProcessing.java
(Not including direct link to individual examples, since we will
continue working on them...)
Looking forward to your feedback!
Thanks and regards,
Pavel & Santiago