users@jax-rs-spec.java.net

[jax-rs-spec users] Re: Non blocking IO: Publishers and Subscribers

From: Pavel Bucek <pavel.bucek_at_oracle.com>
Date: Thu, 23 Mar 2017 14:25:22 +0100

Hi Sergey,

I'm not saying "subscriber is not enough", the message was "it is not so
common to provide a subscriber".

The main reason is most likely the contract enforcement. if an
application gets a handle on a subscriber, it could just call onNext,
without subscribing it anywhere and respecting backpressure and other
contracts (calling onException in defined states, ...) (that's what we
do have with SseEventSink without subscribing it to the SseBroadcaster -
which needs to be fixed).

The example POST method would look something like:

@POST @Path("/ex2")
@Consumes(MediaType.APPLICATION_JSON)
public Flow.Source<AnotherPOJO> ex(Flow.Source<POJO> entity) {

     Flow.Processor<POJO, AnotherPOJO> processor =null;// ... entity.subscribe(processor);
     return processor;
}

(Source = Publisher, Sink = Subscriber, Processor<X, Y> implements
Sink<X>, Souce<Y>).

Source<ByteBuffer> will be always supported a the low level access, the
rest will be provided via Nio Body Provider.

Does that help?

Thanks,
Pavel


On 23/03/2017 12:42, Sergey Beryozkin wrote:
> Hi Pavel
>
> Thanks for the update, initial question is, what does it mean having a
> subscriber is not enough as far as the JAX-RS resource method
> processing for ex a POST request the NIO way, can you provide a code
> example ? I think I might understand, the resource will start pulling
> somehow, but would like to clarify
> Thanks, Sergey
>
>
> On 23/03/17 08:47, Pavel Bucek wrote:
>>
>> Dear EG members,
>>
>> we are still making progress on nio proposal (please see "nio" branch
>> on github [1]). Feel free to send any feedback you might have, or
>> propose any changes.
>>
>> (nio branch has Publisher renamed to Source, but I'll use term
>> Publisher in this email to avoid confusion).
>>
>> When we evaluated how other frameworks are using similar API, we
>> noticed that "public" APIs are usually Publishers, not subscribers.
>> Thus we are trying to remove all exposed subscribers from Nio
>> proposal and replace it by providing Publishers or returning
>> Publishers from resource methods / nio body workers.
>>
>> This has several consequences, derived from the stream nature of data
>> we are usually processing (request/response entities). For example,
>> we have to be sure that subscriber for any entity will get whole
>> entity (not a single byte can be skipped). That brings us to couple
>> of rules for framework provided entity publishers:
>>
>>
>> a) Publisher won't start publishing unless there is a demand. Demand
>> means subsciption.request(x) where x > 0. That means the sole
>> presence of a subscriber/subscription is not enough.
>>
>> Reasoning: we want to avoid filling Publisher buffers or require ones.
>>
>>
>> b) there will be a subscriber limit - only single subscriber is allowed.
>>
>> Reasoning: subscriber has to be certain that all bytes are consumed
>> (when you'd skip initial part of the message, you can throw that out.
>> Subscribing 2 subscribers with a guarantee that it will read complete
>> entity is "hard" (puts caching requirements on the publisher, which
>> might not be simple to fulfill). Also, when a user want's to have
>> another subscriber for logging, it would need to be a processor and
>> do a "chain" instead of a "tree". Another reason is that using one
>> ByteBuffer instance among multiple readers is not possible - we'd
>> need to do some kind of view and manage references.
>>
>>
>> Another rule might need to apply from application returned publishers:
>>
>> When the publisher is created by the resource method, not based on
>> the request entity (i.e. handling HTTP GET method), it needs to be
>> "caching" or "lazy", something like [2] or [3]. I'm still not sure
>> whether 3rd party frameworks do commonly support Publishers similar
>> to what was described as a) and b) - if there is anyone who is more
>> familiar with flows, please let us know.
>>
>> Any comments or suggestions are welcomed.
>>
>> Thanks and regards,
>> Pavel
>>
>> [1] https://github.com/jax-rs/api/tree/nio
>> [2]
>> https://projectreactor.io/docs/core/release/api/reactor/core/publisher/ReplayProcessor.html
>> [3]
>> http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#cacheWithInitialCapacity(int)
>>
>