users@grizzly.java.net

Re: Need help to get UDP client working

From: Oleksiy Stashok <oleksiy.stashok_at_oracle.com>
Date: Mon, 10 Jun 2013 15:19:49 -0700

On 10.06.13 15:00, Johan Maasing wrote:
> Wow! I am impressed that you managed to find the bug without the source code :-D Thank you!
No problem :)
Now, after I saw your code I can probably suggest you one more
optimization to avoid byte[] copying.
Instead of using ByteArrayOutputStream use Grizzly BufferOutputStream:

         try (final ByteArrayOutputStream bufferOutputStream = new BufferOutputStream(obtainMemoryManager(attributeStorage))) {
             try (final ObjectOutputStream objectOutputStream = new ObjectOutputStream(bufferOutputStream)) {
                 objectOutputStream.writeObject(serializable);
                 final Buffer buffer = bufferOutputStream.getBuffer();
                 buffer.flip();

  .....


WBR,
Alexey.

>
> /Johan M
>
> =====
> protected TransformationResult<Serializable, Buffer> transformImpl(final AttributeStorage attributeStorage,
> final Serializable serializable)
> throws TransformationException {
>
> Logger log = LoggerFactory.getLogger(getClass());
>
> if (serializable == null) {
> throw new TransformationException("Will not serialize null");
> }
> log.trace("Serializing {}", serializable);
> try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
> try (final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
> objectOutputStream.writeObject(serializable);
> final Buffer buffer = obtainMemoryManager(attributeStorage).allocate(byteArrayOutputStream.size());
> final byte[] bytes = byteArrayOutputStream.toByteArray();
> log.trace("Serializing resulted in {} bytes", bytes.length);
> buffer.put(bytes);
>
> --->>> Added this <<<---
> --->>> buffer.flip(); <<<---
>
>
> final TransformationResult<Serializable, Buffer> result = TransformationResult
> .createCompletedResult(buffer, null);
> return result;
> }
> } catch (IOException e) {
> throw new TransformationException(e);
> }
> }
>
> On 10 jun 2013, at 23:49, Oleksiy Stashok <oleksiy.stashok_at_oracle.com> wrote:
>
>> Hi Johan,
>>
>> I think the problem is in SerializableFilter (SerializableTransformer), most probably you forgot to flip the result Buffer.
>> If you're not sure what exactly to do - pls. share the SerializableTransformer code.
>>
>> Thanks.
>>
>> WBR,
>> Alexey.
>>
>> On 10.06.13 11:47, Johan Maasing wrote:
>>> Sorry if I loose the thread history, I messed up my mail clients :)
>>>
>>> Here is the client and server code, I haven't got around to the TCP stuff so that might be broken but that isn't my problem right now.
>>> However, if I use 'netcat -u -l' as a mock server shouldn't I see it outputting something if the client sent data?
>>> Also, if I use netcat as mock client I see that logging that server tries to process whatever I send from netcat so it is at least activated on UDP traffic.
>>>
>>>
>>> === Client ===
>>> public class GrizzlyServerConnection extends AbstractIdleService implements ServerConnection {
>>>
>>> private final Logger log = LoggerFactory.getLogger(getClass());
>>> private final String serverAddress;
>>> private final int tcpPort;
>>> private final int udpPort;
>>> private final MessageDispatcher callbackHandler;
>>> private TCPNIOTransport tcpTransport;
>>> private UDPNIOTransport udpTransport;
>>> private Connection tcpConnection;
>>> private Connection udpConnection;
>>>
>>> public GrizzlyServerConnection(final String serverAddress, final int tcpPort, final int udpPort, final MessageDispatcher callbackHandler) {
>>> checkNotNull(serverAddress);
>>> checkArgument(serverAddress.length() > 1);
>>> this.serverAddress = serverAddress;
>>> this.tcpPort = (tcpPort > 255) ? tcpPort : ConnectionConstants.DEFAULT_TCP_PORT;
>>> this.udpPort = (udpPort > 255) ? udpPort : ConnectionConstants.DEFAULT_UDP_PORT;
>>> this.callbackHandler = checkNotNull(callbackHandler);
>>> }
>>>
>>> @Override
>>> public void sendMessage(final MessageStruct message) throws CommunicationException {
>>> log.trace("Sending message {} to server", message);
>>> GrizzlyFuture<WriteResult<MessageStruct, SocketAddress>> write = udpConnection.write(message);
>>>
>>> try {
>>> WriteResult<MessageStruct, SocketAddress> get = write.get();
>>> log.debug("Message {}, dstA {}, size {}", get.getMessage(), get.getDstAddress(), get.getWrittenSize());
>>> } catch (InterruptedException | ExecutionException ex) {
>>> }
>>> }
>>>
>>> @Override
>>> public Set<Model> getModels(final RequestAssetsMessage msg) throws CommunicationException {
>>>
>>> final GrizzlyFuture writeResult = tcpConnection.write(msg);
>>> try {
>>> final Object response = writeResult.get();
>>> if (response instanceof Set) {
>>> return ((Set<Model>) response);
>>> } else {
>>> throw new ProtocolException("Response was not a set of models : " + response);
>>> }
>>> } catch (InterruptedException | ExecutionException e) {
>>> throw new CommunicationTimeoutException(e);
>>> }
>>> }
>>>
>>> @Override
>>> public void addListener(final MessageEventListener messageEventListener) {
>>>
>>> this.callbackHandler.addListener(checkNotNull(messageEventListener));
>>> }
>>>
>>> @Override
>>> public void removeListener(final MessageEventListener messageEventListener) {
>>>
>>> this.callbackHandler.removeListener(messageEventListener);
>>> }
>>>
>>> @Override
>>> protected void startUp() throws Exception {
>>>
>>> FilterChainBuilder tcpFilterChainBuilder = createFilterChainBuilder("TCP");
>>> this.tcpTransport = TCPNIOTransportBuilder.newInstance().build();
>>> this.tcpTransport.setProcessor(tcpFilterChainBuilder.build());
>>> this.tcpTransport.start();
>>> final GrizzlyFuture<Connection> tcpConn = this.tcpTransport.connect(new InetSocketAddress(serverAddress, tcpPort));
>>> log.debug("Trying TCP connection");
>>> this.tcpConnection = checkNotNull(tcpConn.get(10, TimeUnit.SECONDS));
>>>
>>> FilterChainBuilder udpFilterChainBuilder = createFilterChainBuilder("UDP");
>>> // udpFilterChainBuilder.add(new MessageDispatchFilter(this.callbackHandler));
>>> this.udpTransport = UDPNIOTransportBuilder.newInstance().build();
>>> udpTransport.setProcessor(udpFilterChainBuilder.build());
>>> udpTransport.start();
>>> final GrizzlyFuture<Connection> udpConn = udpTransport.connect(new InetSocketAddress(serverAddress, udpPort));
>>> log.debug("Trying UDP connection");
>>> this.udpConnection = checkNotNull(udpConn.get(10, TimeUnit.SECONDS));
>>>
>>> log.debug("Connections to server TCP: {}, UDP: {}", this.tcpConnection, this.udpConnection);
>>> }
>>>
>>> @Override
>>> protected void shutDown() throws Exception {
>>>
>>> log.debug("Closing connections to server");
>>> final GrizzlyFuture udpClose = this.udpConnection != null ? this.udpConnection.close() : null;
>>> final GrizzlyFuture tcpClose = this.tcpConnection != null ? this.tcpConnection.close() : null;
>>> if (udpClose != null) {
>>> log.debug("Waiting for UDP to close");
>>> udpClose.get();
>>> log.debug("UDP Closed");
>>> }
>>> if (tcpClose != null) {
>>> log.debug("Waiting for TCP to close");
>>> tcpClose.get();
>>> log.debug("TCP closed");
>>> }
>>> }
>>>
>>> private FilterChainBuilder createFilterChainBuilder(final String protocol) {
>>> FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
>>> filterChainBuilder.add(new TransportFilter());
>>> filterChainBuilder.add(new LoggingFilter("Client " + protocol));
>>> filterChainBuilder.add(new SerializableFilter());
>>> return filterChainBuilder;
>>> }
>>> }
>>>
>>>
>>> And here is the server code:
>>> === Server ===
>>> public class GrizzlyService extends AbstractIdleService {
>>>
>>> private final Logger log = LoggerFactory.getLogger(getClass());
>>> private final ServerService serverService;
>>> private TCPNIOTransport tcpTransport;
>>> private UDPNIOTransport udpTransport ;
>>> private final InetSocketAddress tcpListenAddress ;
>>> private final InetSocketAddress udpListenAddress ;
>>> private final MessageDispatcher clientMessageDispatcher ;
>>>
>>> public GrizzlyService(
>>> final ServerService serverService,
>>> final MessageDispatcher clientMessageDispatcher,
>>> final int tcpPort,
>>> final int udpPort) {
>>>
>>> this.serverService = checkNotNull(serverService);
>>> this.clientMessageDispatcher = checkNotNull(clientMessageDispatcher) ;
>>> this.tcpListenAddress = new InetSocketAddress((tcpPort < 255) ? ConnectionConstants.DEFAULT_TCP_PORT: tcpPort) ;
>>> this.udpListenAddress = new InetSocketAddress((udpPort < 255) ? ConnectionConstants.DEFAULT_UDP_PORT : udpPort) ;
>>> }
>>>
>>> @Override
>>> protected void startUp() throws Exception {
>>> FilterChainBuilder tcpFilterChainBuilder = createFilterChainBuilder("TCP");
>>> tcpFilterChainBuilder.add(new ModelFilter(this.serverService));
>>>
>>> this.tcpTransport = TCPNIOTransportBuilder.newInstance().build();
>>> this.tcpTransport.setProcessor(tcpFilterChainBuilder.build());
>>> TCPNIOServerConnection tcpServerConnection = this.tcpTransport.bind(this.tcpListenAddress);
>>> this.tcpTransport.start();
>>>
>>> log.debug("Server TCP started on {}", tcpServerConnection.getLocalAddress());
>>>
>>> FilterChainBuilder udpFilterChainBuilder = createFilterChainBuilder("UDP");
>>> udpFilterChainBuilder.add(new MessageDispatchFilter(this.clientMessageDispatcher));
>>>
>>> this.udpTransport = UDPNIOTransportBuilder.newInstance().build();
>>> this.udpTransport.setProcessor(udpFilterChainBuilder.build());
>>> UDPNIOServerConnection udpServerConnection = this.udpTransport.bind(this.udpListenAddress);
>>> this.udpTransport.start();
>>>
>>> log.debug("Server UDP started on {}", udpServerConnection.getLocalAddress());
>>> }
>>>
>>> @Override
>>> protected void shutDown() throws Exception {
>>>
>>> log.debug("Stopping Grizzly transport");
>>> this.tcpTransport.stop();
>>> this.udpTransport.stop();
>>> }
>>>
>>> private FilterChainBuilder createFilterChainBuilder(final String protocol) {
>>> final FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
>>> filterChainBuilder.add(new TransportFilter());
>>> filterChainBuilder.add(new LoggingFilter("Server " + protocol));
>>> filterChainBuilder.add(new SerializableFilter());
>>> return filterChainBuilder;
>>> }
>>> }
>>>
>>> Here are the filters I use, the transformers just use ObjectStreams to create a byte[] of the message. They also log the number of bytes and I can see logging from the client filter chain but not from the server.
>>>
>>> === Filters ===
>>>
>>> public class SerializableFilter extends AbstractCodecFilter<Buffer, Serializable> {
>>>
>>> public SerializableFilter() {
>>>
>>> super(new DeserializableTransformer(), new SerializableTransformer());
>>> }
>>> }
>>>
>>> public class LoggingFilter extends BaseFilter {
>>>
>>> private final String name ;
>>> private final Logger log = LoggerFactory.getLogger(getClass());
>>>
>>> public LoggingFilter(final String name) {
>>> this.name = name;
>>> }
>>>
>>> @Override
>>> public NextAction handleRead(FilterChainContext ctx) throws IOException {
>>> log.trace("{} : Read", this.name);
>>> return super.handleRead(ctx);
>>> }
>>>
>>> @Override
>>> public NextAction handleWrite(FilterChainContext ctx) throws IOException {
>>> log.trace("{} : Write", this.name);
>>> return super.handleWrite(ctx);
>>> }
>>>
>>> @Override
>>> public NextAction handleConnect(FilterChainContext ctx) throws IOException {
>>> log.trace("{} : Connect", this.name);
>>> return super.handleConnect(ctx);
>>> }
>>>
>>> @Override
>>> public NextAction handleAccept(FilterChainContext ctx) throws IOException {
>>> log.trace("{} : Accept", this.name);
>>> return super.handleAccept(ctx);
>>> }
>>>
>>> @Override
>>> public NextAction handleEvent(FilterChainContext ctx, FilterChainEvent event) throws IOException {
>>> log.trace("{} : Event", this.name);
>>> return super.handleEvent(ctx, event);
>>> }
>>>
>>> @Override
>>> public NextAction handleClose(FilterChainContext ctx) throws IOException {
>>> log.trace("{} : Close", this.name);
>>> return super.handleClose(ctx);
>>> }
>>> }
>>>
>>>
>>>> Hi Johan,
>>>> the client code looks ok, can you pls. share both the client and server code?
>>>> Thanks.
>>>> WBR,
>>>> Alexey.
>>> On 09.06.13 06:44, Johan Maasing wrote:
>>>> Hi,
>>>> I'm struggling (new to Grizzly) to get UDP traffic going between a server and client. Currently I'm stuck at getting the client to send.I hope someone can spot what I'm doing wrong.
>>>>