users@jax-rs-spec.java.net

[jax-rs-spec users] Re: Non blocking IO: Publishers and Subscribers

From: Pavel Bucek <pavel.bucek_at_oracle.com>
Date: Thu, 23 Mar 2017 15:14:45 +0100

Ah, this subscriber...

I was trying to explain, that a Publisher won't start consuming (and
consequently publishing) events unless there is a subscriber which invokes

subscription.request(x); // x > 1

I'm still not exactly sure how we'll make the whole "processor flow"
work, but one of the ideas was about postponing "Sink#onSubscribe" until
we know that the chain is complete, so resource method was invoked and
we have a result.

Then, there is still an issue with a resource method, which would
consume a Source and block until something is received, so the
"onSubscribe" for a Sink registered in a resource method most likely
cannot be delayed, which seems to be doable.

If there would need to be a rule for NioBodyReader (non-blocking version
of MessageBodyReader), which will say that the "readFrom" method cannot
block. (It doesn't make sense, since it consumes a Publisher and
provides another one, but the processing won't be able to continue if
there is a blocking call).

I will try to create some diagram or a scheme..

Regards,
Pavel



On 23/03/2017 14:30, Sergey Beryozkin wrote:
> HI Pavel
>
> The code below looks good, but may be we are talking about slightly
> different things,
> in the list of rules, in a), you do say:
> "That means the sole presence of a subscriber/subscription *is not
> enough.*"
>
> Thanks, Sergey
> On 23/03/17 13:25, Pavel Bucek wrote:
>>
>> Hi Sergey,
>>
>> I'm not saying "subscriber is not enough", the message was "it is not
>> so common to provide a subscriber".
>>
>> The main reason is most likely the contract enforcement. if an
>> application gets a handle on a subscriber, it could just call onNext,
>> without subscribing it anywhere and respecting backpressure and other
>> contracts (calling onException in defined states, ...) (that's what
>> we do have with SseEventSink without subscribing it to the
>> SseBroadcaster - which needs to be fixed).
>>
>> The example POST method would look something like:
>>
>> @POST @Path("/ex2")
>> @Consumes(MediaType.APPLICATION_JSON)
>> public Flow.Source<AnotherPOJO> ex(Flow.Source<POJO> entity) {
>>
>> Flow.Processor<POJO, AnotherPOJO> processor =null;// ... entity.subscribe(processor);
>> return processor;
>> }
>>
>> (Source = Publisher, Sink = Subscriber, Processor<X, Y> implements
>> Sink<X>, Souce<Y>).
>>
>> Source<ByteBuffer> will be always supported a the low level access,
>> the rest will be provided via Nio Body Provider.
>>
>> Does that help?
>>
>> Thanks,
>> Pavel
>>
>>
>> On 23/03/2017 12:42, Sergey Beryozkin wrote:
>>> Hi Pavel
>>>
>>> Thanks for the update, initial question is, what does it mean having
>>> a subscriber is not enough as far as the JAX-RS resource method
>>> processing for ex a POST request the NIO way, can you provide a code
>>> example ? I think I might understand, the resource will start
>>> pulling somehow, but would like to clarify
>>> Thanks, Sergey
>>>
>>>
>>> On 23/03/17 08:47, Pavel Bucek wrote:
>>>>
>>>> Dear EG members,
>>>>
>>>> we are still making progress on nio proposal (please see "nio"
>>>> branch on github [1]). Feel free to send any feedback you might
>>>> have, or propose any changes.
>>>>
>>>> (nio branch has Publisher renamed to Source, but I'll use term
>>>> Publisher in this email to avoid confusion).
>>>>
>>>> When we evaluated how other frameworks are using similar API, we
>>>> noticed that "public" APIs are usually Publishers, not subscribers.
>>>> Thus we are trying to remove all exposed subscribers from Nio
>>>> proposal and replace it by providing Publishers or returning
>>>> Publishers from resource methods / nio body workers.
>>>>
>>>> This has several consequences, derived from the stream nature of
>>>> data we are usually processing (request/response entities). For
>>>> example, we have to be sure that subscriber for any entity will get
>>>> whole entity (not a single byte can be skipped). That brings us to
>>>> couple of rules for framework provided entity publishers:
>>>>
>>>>
>>>> a) Publisher won't start publishing unless there is a demand.
>>>> Demand means subsciption.request(x) where x > 0. That means the
>>>> sole presence of a subscriber/subscription is not enough.
>>>>
>>>> Reasoning: we want to avoid filling Publisher buffers or require ones.
>>>>
>>>>
>>>> b) there will be a subscriber limit - only single subscriber is
>>>> allowed.
>>>>
>>>> Reasoning: subscriber has to be certain that all bytes are consumed
>>>> (when you'd skip initial part of the message, you can throw that
>>>> out. Subscribing 2 subscribers with a guarantee that it will read
>>>> complete entity is "hard" (puts caching requirements on the
>>>> publisher, which might not be simple to fulfill). Also, when a user
>>>> want's to have another subscriber for logging, it would need to be
>>>> a processor and do a "chain" instead of a "tree". Another reason is
>>>> that using one ByteBuffer instance among multiple readers is not
>>>> possible - we'd need to do some kind of view and manage references.
>>>>
>>>>
>>>> Another rule might need to apply from application returned publishers:
>>>>
>>>> When the publisher is created by the resource method, not based on
>>>> the request entity (i.e. handling HTTP GET method), it needs to be
>>>> "caching" or "lazy", something like [2] or [3]. I'm still not sure
>>>> whether 3rd party frameworks do commonly support Publishers similar
>>>> to what was described as a) and b) - if there is anyone who is more
>>>> familiar with flows, please let us know.
>>>>
>>>> Any comments or suggestions are welcomed.
>>>>
>>>> Thanks and regards,
>>>> Pavel
>>>>
>>>> [1] https://github.com/jax-rs/api/tree/nio
>>>> [2]
>>>> https://projectreactor.io/docs/core/release/api/reactor/core/publisher/ReplayProcessor.html
>>>> [3]
>>>> http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#cacheWithInitialCapacity(int)
>>>>
>>>
>>
>