Hi
The way we implemented the initial NIO draft was to provide
MessageBodyWriter/Reader which would use
to link InputStream/OutputStream to the corresponding
ServletInputStream/OutputStream callback handlers notifying our code
when the write or read was possible.
I.e the fact MBW or MBR accepts Input/Output stream did not stop us from
implementing NIO draft.
May be the introduction of the new NIO-specific MBW/MBR interfaces is
justified but these are internal providers the users who would link Flow
with other high-level reactive APIs won't use directly thus I'm
wondering are they really needed ?
Thanks, Sergey
On 07/03/17 14:25, Pavel Bucek wrote:
>
> Dear EG members,
>
> please allow me to share the direction of what we are thinking about
> JAX-RS NIO support.
>
> As stated before, SSE wasn't the only place were Flow APIs should be
> utilized - NIO is another area where it can be utilized quite heavily.
> And that was mostly the main idea - to see where it does make sense to
> use that.
>
> Similarly to SSE, there is a plan to minimize / extract the Flow to
> different classes, but there should always be a clear path how to
> convert an instance into Flow.* (or org.reactivestreams.*) interfaces.
> It is not done yet, to keep things as clear as possible.
>
> One of the bigger concerns is backwards compatibility. There are
> interfaces, which do work directly with Input/Output stream, which is
> always a problem for reactive processing, since it is blocking by
> design. The specification will need to say something like "The runtime
> is non-blocking as long as the application code doesn't read from a
> provided InputStream or doesn't set an OutputStream."
>
> The motivation for doing this is to allow the JAX-RS apps to be
> non-blocking and reactive. JAX-RS 2.0 entity handling is designed
> around reading / producing Input / Output Stream, which is blocking by
> design. Non-blocking approach should result in higher throughput and
> better resource utilization of the server. Also, integration and
> coexistence with modern reactive frameworks should be possible to do
> without losing the advantage of having that framework (which was
> almost completely lost when dealing with blocking inputs/outputs).
>
> Let's jump into code snippets.
>
>
> *Server - EX1 (byte handling):*
>
> Snippet below shows how to process request entity body return response
> entity body, using Publisher<ByteBuffer> - no MessageBodyReader/Writer
> is involved. This can be used as a low-level integration point for
> other frameworks, which are also reactive and will do the processing,
> like serializing/deserializing (mapping) to some java type, filtering,
> etc. Returning a Publisher<ByteBuffer> is a reactive/nio alternative
> to javax.ws.rs.core.StreamingOutput, consuming an entity using
> Publisher<ByteBuffer> is a reactive/nio alternative to consuming
> entity as an InputStream.
>
> @POST @Path("/ex1")
> public Flow.Publisher<ByteBuffer> ex1(Flow.Publisher<ByteBuffer> entity) {
> Ex1Processor processor =new Ex1Processor();
>
> entity.subscribe(processor);
> return processor;
> }
>
> // ex1 processor public static class Ex1Processorimplements Flow.Processor<ByteBuffer, ByteBuffer> {
>
> // ... }
>
> And there is already an issue, which is not clearly solved.
>
> Returning a Publisher instance from the resource method does put some
> constraints on the Publisher itself - it needs to cache all events
> which will be emitted prior subscription of the jax-rs implementation
> Subscriber instance (which is the only way how to get the data from a
> Publisher).
>
> This can be solved by stating that *request* entity publisher won't
> produce any events until the resource method is invoked and the
> implementation Subscriber subscribed to the returned *response* entity
> publisher. Or the resource method can return
> Consumer<Flow.Subscriber<ByteBuffer>>, which would effectively grant
> control of the implementation subscription process. Any comments or
> suggestions welcomed *[ref Q1]*.
>
>
> *Server - EX2 (consuming stream of pojos):*
>
> The next example should be slightly more straightforward. It shows how
> to process a Publisher of custom type, in this case called "POJO".
>
> The only limitation or rule here would be that the subscriber for the
> request entity must be subscribed before the resource method "ex2"
> invocation ends.
>
> New interface is introduced here - NioBodyReader. It has exactly the
> same responsibility as good old MessageBodyReader, but without using a
> blocking OutputStream to write the entity. Note that the "core" type
> is the Publisher<ByteBuffer>, which is in this case mapped (or
> converted) into Publisher of POJOs.
>
> @POST @Path("/ex2")
> @Consumes(MediaType.APPLICATION_JSON)public void ex2(Flow.Publisher<POJO> entity,
> @Suspended AsyncResponse response) {
>
> // TODO: introduce a helper or modify AsyncResponse to support this
> pattern directly? entity.subscribe(
> // POJO subscriber - consumer new Flow.Subscriber<POJO>() {
> @Override public void onSubscribe(Flow.Subscription subscription) {
> // ... }
>
> @Override public void onNext(POJO item) {
> // ... }
>
> @Override public void onError(Throwable throwable) {
> response.resume(throwable);
> }
>
> @Override public void onComplete() {
> response.resume(Response.ok().build());
> }
> }
> );
> }
>
> @Provider
> @Consumes(MediaType.APPLICATION_JSON)
> public static class Ex2NioBodyReaderimplements NioBodyReader<POJO> {
> @Override public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
> return true;
> }
>
> @Override public Flow.Publisher<POJO> readFrom(Flow.Publisher<ByteBuffer> entity,
> Class<POJO> type,
> Type genericType,
> Annotation[] annotations,
> MediaType mediaType,
> MultivaluedMap<String, String> httpHeaders) {
> Ex2MappingProcessor mappingProcessor =new Ex2MappingProcessor();
> entity.subscribe(mappingProcessor);
> return mappingProcessor;
> }
> }
>
> // mapping Publisher<ByteBuffer> to Publisher<POJO> // ByteBuffers are
> expected to contain JSON (indicated by @Consumes on the resource
> method and NioBodyReader). public static class Ex2MappingProcessorimplements Flow.Subscriber<ByteBuffer>, Flow.Publisher<POJO> {
> // ... }
> Same issue as *[Q1]* is valid for this as well - same solution will
> need to be applied for "readFrom" method.
>
> Another issue is about what should be passed to "isReadable" method as
> "type" parameter. I'm not exactly sure whether we can safely obtain
> generic type of a parameter from the resource method (public void
> ex2(Flow.Publisher< *POJO*> entity, ..)). Any comments/suggestions
> welcomed *[ref Q2]*.
>
> Note that using @Suspended shouldn't be enforced here; it should be
> possible to return a Response directly and still be able to consume
> the *request* entity.
>
>
> *Server - EX3 (producing list of POJOs):*
>
> The last (for now) example shows how we can produce and write POJOs.
> Resource method doesn't take any parameters and provides a Publisher
> of POJO objects, which will be converted to JSON in NioBodyWriter.
> NioBodyReader is a reactive alternative to MessageBodyReader from
> older version of the specification.
>
> @GET @Path("/ex3")@Produces(MediaType.APPLICATION_JSON)public Flow.Publisher<POJO> ex3() {
>
> Flow.Publisher<POJO> pojoPublisher =null;
>
> // source of the POJO "stream" can be anything - database call, client
> call to // another service, ... // // DB // .getEmployees(department)
> // StreamPublisher<EmployeeDbModel> -- reactive stream //
> .map((Function<EmployeeDbModel, EmployeeToReturn>) employeeDbModel ->
> { // // ... // }); return pojoPublisher;
> }
>
> @Provider @Produces(MediaType.APPLICATION_JSON)
> public static class Ex3NioBodyWriterimplements NioBodyWriter<POJO> {
> @Override public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
> return true;
> }
>
> @Override public void writeTo(Flow.Publisher<POJO> entityObjectPublisher,
> Flow.Subscriber<ByteBuffer> subscriber,
> Class<?> type,
> Type genericType,
> Annotation[] annotations,
> MediaType mediaType,
> MultivaluedMap<String, Object> httpHeaders) {
> // map Publisher<POJO> to Publisher<ByteBuffer> and subscribe
> Flow.Subscriber<ByteBuffer> to it. }
> }
>
> Resource method is minimalistic, *[Q1]* applies here as well.
>
> The example introduces NioBodyWriter and its isWriteable method does
> have *[Q2]*, similarly to NioBodyReader. #writeTo doesn't have any
> issues - *[Q1]* is mitigated there because the implementation passes a
> supplier to the implementation - there doesn't need to be anything
> returned. Something similar might be able to do for NioBodyWriter as well.
>
>
> Comment to writing multiple POJO instances:
> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/jaxrs-api/src/main/java/javax/ws/rs/ext/NioBodyWriter.java#L82
>
> Important point to mention is that even when producing multiple
> instances, the intention here is still to return the single HTTP response.
>
> ===
>
> We have more, but this email is already too long - I will post more
> after there is some feedback on the presented concepts and issues.
> Please let us know if this format is OK or if you'd prefer something
> else - I guess I could do a screencast, hangout or something similar.
>
> Source links:
>
> - complete server example:
> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioResource.java
> - client (to be discussed):
> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioClient.java
> - server side processing (including interceptors):
> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/ServerSideProcessing.java
>
> (Not including direct link to individual examples, since we will
> continue working on them...)
>
> Looking forward to your feedback!
>
> Thanks and regards,
> Pavel & Santiago