users@grizzly.java.net

Re: Grizzly TCP Server: how to async call inside a filter

From: Raffaele Marcello <marcelloraffaele_at_gmail.com>
Date: Fri, 25 Oct 2013 18:07:29 +0200

Thank you,
you helped me a lot, I developed what you said and it works!!!
I defined an ExecutorService in this way:

       * ...
        TCPNIOTransportBuilder builder =
TCPNIOTransportBuilder.newInstance();
        builder.setTcpNoDelay(noDelay);
        ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();

config.setPoolName(poolName).setCorePoolSize(corePoolSize).setMaxPoolSize(maxPoolSize).setQueueLimit(queueLimit);

        transport = builder.build();
        FilterChainBuilder filterChainBuilder =
FilterChainBuilder.stateless();
        filterChainBuilder.add(new TransportFilter());
        filterChainBuilder.add(new MyProtocolFilter();

        ExecutorService executorService =
GrizzlyExecutorService.createInstance( config );
        filterChainBuilder.add( new MyAsyncCallFilter( executorService );
        ...*

and i used it in MyAsyncCallFilter:
     * ...
      AsyncCaller atc = new AsyncCaller( message, ctx.getConnection(),
configuration);
      executorService.execute(atc);
*
* ...*

In that way, i hope that Grizzly will use the same Pool for manage of
connections(running filter chains) and for async call. Do you think that
this implementation work as I want?
If it is true is there a way to runtime monitor the number of thread that
Grizzly is using?
Maybe i know this answer, i could use JMX services... Is there some example
that do this? I'm going to stress the server and i want to monitor the
resources that it need to discover how many concurrent connection i can
manage in stress condition.

Thank you
R



2013/10/24 Oleksiy Stashok <oleksiy.stashok_at_oracle.com>

>
> On 23.10.13 15:45, Raffaele Marcello wrote:
>
> Very well!
> I was working to something similar, i created a Runnable (that do the
> same thing that you suggested) and i executed it using an ExecutorService:
>
> *AsyncThreadCaller atc = new AsyncThreadCaller(body,
> ctx.getConnection() );
> ExecutorService executor = Executors.newFixedThreadPool(1);
> *
> * executor.execute(atc);*
>
> It was only a test! I would like to maintain good performance and few
> resources consumption.
> I'm using this server in a J2EE context so isn't suggested to create
> Thread manually.
> Which is the best way? Maybe i can create an ExecutorService(pool of one
> thread) for each connection.
>
> It's up to you for sure and depends on your specific usecase, but I'd
> rather recommend to create a shared thread-pool, so all connections will
> use it.
>
>
> Is there in Grizzly something that can help to create a Thread?
>
> Yes, we have ThreadPoolConfig and GrizzlyExecutorService, which you can
> use like:
>
> ThreadPoolConfig config = ThreadPoolConfig.newConfig().setCorePoolSize(2).setMaxPoolSize(16);
> ExecutorService threadPool = GrizzlyExecutorService.createInstance(config);
>
>
> Though in JavaEE 7 it's possible to inject JavaEE container's thread-pool
> [1], I've never tried that yet, but you might be interested :)
>
>
> Another question (maybe out of this topic): is possible that during the
> Connection.write(...) the trasmission is dalayed?
>
> It's possible, but you will not notice it, this operation is non-blocking
> and if data can't be flushed immediately - it will be added to the internal
> async write queue and written asap.
>
>
> Can i flush on output stream in order to be sure that the message is
> written in the socket?
>
> you can pass a CompletionHandler, which will be notified once the message
> is fully written.
>
> *connection.write(message, completionHandler);*
>
> Thanks.
>
> WBR,
> Alexey.
>
> [1]
> http://www.adam-bien.com/roller/abien/entry/injecting_an_executorservice_with_java
>
>
>
>
> Thanks
> R
>
>
> 2013/10/23 Oleksiy Stashok <oleksiy.stashok_at_oracle.com>
>
>> Hi,
>>
>> I reworked the MyCallerFilter this way to simulate the async JobExecution:
>>
>> * logger.info( "handling a MyMessage..." );
>>
>> Connection connection = ctx.getConnection();
>> MyMessage m = (MyMessage) message;
>> byte[] body = m.getMsg();
>>
>> logger.info("m="+ m );
>>
>> new Thread() {
>> public void run() {
>> byte[] res = JobExecution.exec( body );
>>
>> MyMessage answer = new MyMessage( res.length, res );
>>
>> connection.write(answer);
>> logger.info( "i wrote the MyMessage: "+answer );
>> }
>> }.start();
>>
>> return ctx.getStopAction();*
>>
>> Please note that in the reworked example we use connection.write(...)
>> instead of ctx.write(...) because FilterChainContext might be disposed by
>> that time.
>> It is also possible to optimize MyProtocolFilter to avoid byte[] copying,
>> but I'm not sure you use the same code in your real server, so will not
>> touch that code for now.
>>
>> Hope that helps.
>>
>> WBR,
>> Alexey.
>>
>>
>> On 23.10.13 04:13, Raffaele Marcello wrote:
>>
>> Thank you, i appreciate a lot your help!
>> Unfortunately i cannot share my code but i can show you an example (you
>> can see attached files). In reality i was inspired by BIOPFilter from
>> official examples.
>>
>> In my protocol can happen that some ritrasmitted messages arrive while
>> the server is waiting for a response.
>> I would like to listen arriving messages (read) even if i'm waiting for
>> responses. As you can see, i used two filter, when MyCallerFilter is
>> executing the request to remote services, MyProtocolFilter is unable to
>> read from input stream.
>> Maybe i can do it using async call on service (maybe using a thread) or
>> using async call from MyProtocolFilter to MyCallerFilter.
>> Unfortunately i didn't understand how to use "*asyncMessageProcessor*",
>> which class it is? Are there some examples that show how to use it?
>> Is there the chance to create a task and ask Grizzly exec it async?
>> Any other suggestion?
>>
>> Thanks
>> R
>>
>>
>>
>>
>> 2013/10/22 Oleksiy Stashok <oleksiy.stashok_at_oracle.com>
>>
>>> Hi,
>>>
>>>
>>>
>>> On 22.10.13 07:10, Raffaele Marcello wrote:
>>>
>>>> Hi,
>>>> i developed a TCP Server using Grizzly using two Filters:
>>>> - MyProtocolFilter to manage the protocol( messages recognition )
>>>> - MyCallerFilter to invoke a remote server (using RMI) and manage the
>>>> answer.
>>>>
>>>> Unfortunately now it works using only one thread per connection. I'm
>>>> having problems because sometimes some message can arrive while i'm waiting
>>>> for the remote method invocation. In that case the new message is queued in
>>>> input buffer and it can cause retransmissions.
>>>> I want to ask if is possible to asynchronous invoke remote methods
>>>> managing the results(Maybe using another thread). In this way i could
>>>> manage new arriving messages while i'm waiting responses.
>>>>
>>> Sure, it's possible. As I understand your protocol doesn't require
>>> responses to come in the same order as requests?
>>> The easiest thing you can do in MyCallerFilter is:
>>>
>>> public NextAction handleRead(FilterChainContext ctx) throws Exception {
>>> asyncMessageProcessor.doAsync(ctx.getMessage(),
>>> ctx.getConnection()); // pass the message and the connection, so async
>>> processor knows to whom it has to send the response
>>> return ctx.getStopAction();
>>> }
>>>
>>> If you can share your code - I'll be able to provide more details.
>>>
>>>
>>> Does anyone have a solution in order to maintain good performance? Can
>>>> i use some Grizzly configuration or specific class?
>>>>
>>> Sure, you can tune I/O strategies, buffer sizes, etc, but first it will
>>> be good to have something working :)))
>>>
>>> Thanks.
>>>
>>> WBR,
>>> Alexey.
>>>
>>>
>>>>
>>>> Thanks
>>>> R
>>>>
>>>
>>>
>>
>>
>> --
>> RM
>>
>>
>>
>
>
> --
> RM
>
>
>


-- 
RM