users@grizzly.java.net

Re: Grizzly TCP Server: how to async call inside a filter

From: Oleksiy Stashok <oleksiy.stashok_at_oracle.com>
Date: Mon, 28 Oct 2013 18:03:30 -0700

Hi,


On 28.10.13 04:53, Raffaele Marcello wrote:
> Yes thanks, i'm updating my libraries.
>
> I know it's strange but i have a null pointer exception when i sun
> this with with the grizzly versione 2.3.6:
>
> final TCPNIOTransportBuilder builder =
> TCPNIOTransportBuilder.newInstance();
> final ThreadPoolConfig config =
> builder.getWorkerThreadPoolConfig();
> config.setPoolName("myPool").setCorePoolSize(5).setMaxPoolSize(5).setQueueLimit(-1);
> transport = builder.build();
>
> No exception if i use the 2.3.5 version. Maybe something is new in the
> code... Or is there some problem?
I think the later 2.3.6 behavior is correct.
You have to set worker-thread-pool config if you don't want to use the
default one created at the time you call transport.start();
So the correct code will look like:


             final TCPNIOTransportBuilder builder =
TCPNIOTransportBuilder.newInstance();
             final ThreadPoolConfig config =
ThreadPoolConfig.defaultConfig();
config.setPoolName("myPool").setCorePoolSize(5).setMaxPoolSize(5).setQueueLimit(-1);
             builder.setWorkerThreadPoolConfig(config);
             transport = builder.build();

WBR,
Alexey.
>
> Thanks
> R
>
>
>
> 2013/10/26 Oleksiy Stashok <oleksiy.stashok_at_oracle.com
> <mailto:oleksiy.stashok_at_oracle.com>>
>
> Hi,
>
>
> On 25.10.13 09:07, Raffaele Marcello wrote:
>> Thank you,
>> you helped me a lot, I developed what you said and it works!!!
>> I defined an ExecutorService in this way:
>>
>> /...
>> TCPNIOTransportBuilder builder =
>> TCPNIOTransportBuilder.newInstance();
>> builder.setTcpNoDelay(noDelay);
>> ThreadPoolConfig config =
>> builder.getWorkerThreadPoolConfig();
>> config.setPoolName(poolName).setCorePoolSize(corePoolSize).setMaxPoolSize(maxPoolSize).setQueueLimit(queueLimit);
>>
>> transport = builder.build();
>> FilterChainBuilder filterChainBuilder =
>> FilterChainBuilder.stateless();
>> filterChainBuilder.add(new TransportFilter());
>> filterChainBuilder.add(new MyProtocolFilter();
>>
>> ExecutorService executorService =
>> GrizzlyExecutorService.createInstance( config );
>> filterChainBuilder.add( new MyAsyncCallFilter(
>> executorService );
>> .../
>>
>> and i used it in MyAsyncCallFilter:
>> /...
>> AsyncCaller atc = new AsyncCaller( message,
>> ctx.getConnection(), configuration);
>> executorService.execute(atc);
>> /
>> / .../
>>
>> In that way, i hope that Grizzly will use the same Pool for
>> manage of connections(running filter chains) and for async call.
>> Do you think that this implementation work as I want?
> Almost :) For now you use the same thread-pool configuration as
> Grizzly Transport, but in order to use the same thread-pool you
> need to create an ExecutorService instance and explicitly set it
> to the Transport like:
>
> / ThreadPoolConfig config =
> ThreadPoolConfig.defaultConfig().setPoolName(poolName).setCorePoolSize(corePoolSize).setMaxPoolSize(maxPoolSize).setQueueLimit(queueLimit);
> ExecutorService executorService =
> GrizzlyExecutorService.createInstance(config);
>
> transport.setWorkerThreadPool(executorService);
> /
>
>> Maybe i know this answer, i could use JMX services... Is there
>> some example that do this? I'm going to stress the server and i
>> want to monitor the resources that it need to discover how many
>> concurrent connection i can manage in stress condition.
> You have to add monitoring module dependency:
> / <dependency>
> <groupId>org.glassfish.grizzly</groupId>
> <artifactId>grizzly-framework-monitoring</artifactId>
> <version>${grizzly.version}</version>
> </dependency>
> /
> and use code like this to get Transport JMX stats:
>
> / final GrizzlyJmxManager manager =
> GrizzlyJmxManager.instance();
>
> Object jmxTransportObject =
> transport.getMonitoringConfig().createManagementObject();
>
> manager.registerAtRoot(jmxTransportObject, "TCPTransport");
> /
> Hope that will help.
>
> WBR,
> Alexey.
>
>
>>
>>
>> 2013/10/24 Oleksiy Stashok <oleksiy.stashok_at_oracle.com
>> <mailto:oleksiy.stashok_at_oracle.com>>
>>
>>
>> On 23.10.13 15:45, Raffaele Marcello wrote:
>>> Very well!
>>> I was working to something similar, i created a Runnable
>>> (that do the same thing that you suggested) and i executed
>>> it using an ExecutorService:
>>>
>>> /AsyncThreadCaller atc = new AsyncThreadCaller(body,
>>> ctx.getConnection() );
>>> ExecutorService executor = Executors.newFixedThreadPool(1);
>>> /
>>> /executor.execute(atc);/
>>>
>>> It was only a test! I would like to maintain good
>>> performance and few resources consumption.
>>> I'm using this server in a J2EE context so isn't suggested
>>> to create Thread manually.
>>> Which is the best way? Maybe i can create an
>>> ExecutorService(pool of one thread) for each connection.
>> It's up to you for sure and depends on your specific usecase,
>> but I'd rather recommend to create a shared thread-pool, so
>> all connections will use it.
>>
>>
>>> Is there in Grizzly something that can help to create a Thread?
>> Yes, we have ThreadPoolConfig and GrizzlyExecutorService,
>> which you can use like:
>>
>> ThreadPoolConfig config = ThreadPoolConfig.newConfig().setCorePoolSize(2).setMaxPoolSize(16);
>> ExecutorService threadPool = GrizzlyExecutorService.createInstance(config);
>>
>>
>> Though in JavaEE 7 it's possible to inject JavaEE container's
>> thread-pool [1], I've never tried that yet, but you might be
>> interested :)
>>
>>
>>> Another question (maybe out of this topic): is possible that
>>> during the Connection.write(...) the trasmission is dalayed?
>> It's possible, but you will not notice it, this operation is
>> non-blocking and if data can't be flushed immediately - it
>> will be added to the internal async write queue and written
>> asap.
>>
>>
>>> Can i flush on output stream in order to be sure that the
>>> message is written in the socket?
>> you can pass a CompletionHandler, which will be notified once
>> the message is fully written.
>>
>> /connection.write(message, completionHandler);/
>>
>> Thanks.
>>
>> WBR,
>> Alexey.
>>
>> [1]
>> http://www.adam-bien.com/roller/abien/entry/injecting_an_executorservice_with_java
>>
>>
>>
>>>
>>>
>>> Thanks
>>> R
>>>
>>>
>>> 2013/10/23 Oleksiy Stashok <oleksiy.stashok_at_oracle.com
>>> <mailto:oleksiy.stashok_at_oracle.com>>
>>>
>>> Hi,
>>>
>>> I reworked the MyCallerFilter this way to simulate the
>>> async JobExecution:
>>>
>>> /logger.info <http://logger.info>( "handling a
>>> MyMessage..." );
>>>
>>> Connection connection = ctx.getConnection();
>>> MyMessage m = (MyMessage) message;
>>> byte[] body = m.getMsg();
>>>
>>> logger.info <http://logger.info>("m="+ m );
>>>
>>> new Thread() {
>>> public void run() {
>>> byte[] res = JobExecution.exec( body );
>>>
>>> MyMessage answer = new MyMessage( res.length, res );
>>>
>>> connection.write(answer);
>>> logger.info <http://logger.info>( "i wrote the
>>> MyMessage: "+answer );
>>> }
>>> }.start();
>>>
>>> return ctx.getStopAction();/
>>>
>>> Please note that in the reworked example we use
>>> connection.write(...) instead of ctx.write(...) because
>>> FilterChainContext might be disposed by that time.
>>> It is also possible to optimize MyProtocolFilter to
>>> avoid byte[] copying, but I'm not sure you use the same
>>> code in your real server, so will not touch that code
>>> for now.
>>>
>>> Hope that helps.
>>>
>>> WBR,
>>> Alexey.
>>>
>>>
>>> On 23.10.13 04:13, Raffaele Marcello wrote:
>>>> Thank you, i appreciate a lot your help!
>>>> Unfortunately i cannot share my code but i can show you
>>>> an example (you can see attached files). In reality i
>>>> was inspired by BIOPFilter from official examples.
>>>>
>>>> In my protocol can happen that some ritrasmitted
>>>> messages arrive while the server is waiting for a response.
>>>> I would like to listen arriving messages (read) even if
>>>> i'm waiting for responses. As you can see, i used two
>>>> filter, when MyCallerFilter is executing the request to
>>>> remote services, MyProtocolFilter is unable to read
>>>> from input stream.
>>>> Maybe i can do it using async call on service (maybe
>>>> using a thread) or using async call from
>>>> MyProtocolFilter to MyCallerFilter.
>>>> Unfortunately i didn't understand how to use
>>>> "*asyncMessageProcessor*", which class it is? Are there
>>>> some examples that show how to use it?
>>>> Is there the chance to create a task and ask Grizzly
>>>> exec it async?
>>>> Any other suggestion?
>>>>
>>>> Thanks
>>>> R
>>>>
>>>>
>>>>
>>>>
>>>> 2013/10/22 Oleksiy Stashok <oleksiy.stashok_at_oracle.com
>>>> <mailto:oleksiy.stashok_at_oracle.com>>
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> On 22.10.13 07:10, Raffaele Marcello wrote:
>>>>
>>>> Hi,
>>>> i developed a TCP Server using Grizzly using
>>>> two Filters:
>>>> - MyProtocolFilter to manage the protocol(
>>>> messages recognition )
>>>> - MyCallerFilter to invoke a remote server
>>>> (using RMI) and manage the answer.
>>>>
>>>> Unfortunately now it works using only one
>>>> thread per connection. I'm having problems
>>>> because sometimes some message can arrive while
>>>> i'm waiting for the remote method invocation.
>>>> In that case the new message is queued in input
>>>> buffer and it can cause retransmissions.
>>>> I want to ask if is possible to asynchronous
>>>> invoke remote methods managing the
>>>> results(Maybe using another thread). In this
>>>> way i could manage new arriving messages while
>>>> i'm waiting responses.
>>>>
>>>> Sure, it's possible. As I understand your protocol
>>>> doesn't require responses to come in the same order
>>>> as requests?
>>>> The easiest thing you can do in MyCallerFilter is:
>>>>
>>>> public NextAction handleRead(FilterChainContext
>>>> ctx) throws Exception {
>>>> asyncMessageProcessor.doAsync(ctx.getMessage(),
>>>> ctx.getConnection()); // pass the message and the
>>>> connection, so async processor knows to whom it has
>>>> to send the response
>>>> return ctx.getStopAction();
>>>> }
>>>>
>>>> If you can share your code - I'll be able to
>>>> provide more details.
>>>>
>>>>
>>>> Does anyone have a solution in order to
>>>> maintain good performance? Can i use some
>>>> Grizzly configuration or specific class?
>>>>
>>>> Sure, you can tune I/O strategies, buffer sizes,
>>>> etc, but first it will be good to have something
>>>> working :)))
>>>>
>>>> Thanks.
>>>>
>>>> WBR,
>>>> Alexey.
>>>>
>>>>
>>>>
>>>> Thanks
>>>> R
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> RM
>>>
>>>
>>>
>>>
>>> --
>>> RM
>>
>>
>>
>>
>> --
>> RM
>
>
>
>
> --
> RM