users@grizzly.java.net

Re: Need help to get UDP client working

From: Oleksiy Stashok <oleksiy.stashok_at_oracle.com>
Date: Mon, 10 Jun 2013 14:49:22 -0700

Hi Johan,

I think the problem is in SerializableFilter (SerializableTransformer),
most probably you forgot to flip the result Buffer.
If you're not sure what exactly to do - pls. share the
SerializableTransformer code.

Thanks.

WBR,
Alexey.

On 10.06.13 11:47, Johan Maasing wrote:
> Sorry if I loose the thread history, I messed up my mail clients :)
>
> Here is the client and server code, I haven't got around to the TCP
> stuff so that might be broken but that isn't my problem right now.
> However, if I use 'netcat -u -l' as a mock server shouldn't I see it
> outputting something if the client sent data?
> Also, if I use netcat as mock client I see that logging that server
> tries to process whatever I send from netcat so it is at least
> activated on UDP traffic.
>
>
> === Client ===
> public class GrizzlyServerConnection extends AbstractIdleService
> implements ServerConnection {
>
> private final Logger log = LoggerFactory.getLogger(getClass());
> private final String serverAddress;
> private final int tcpPort;
> private final int udpPort;
> private final MessageDispatcher callbackHandler;
> private TCPNIOTransport tcpTransport;
> private UDPNIOTransport udpTransport;
> private Connection tcpConnection;
> private Connection udpConnection;
>
> public GrizzlyServerConnection(final String serverAddress, final int
> tcpPort, final int udpPort, final MessageDispatcher callbackHandler) {
> checkNotNull(serverAddress);
> checkArgument(serverAddress.length() > 1);
> this.serverAddress = serverAddress;
> this.tcpPort = (tcpPort > 255) ? tcpPort :
> ConnectionConstants.DEFAULT_TCP_PORT;
> this.udpPort = (udpPort > 255) ? udpPort :
> ConnectionConstants.DEFAULT_UDP_PORT;
> this.callbackHandler = checkNotNull(callbackHandler);
> }
>
> @Override
> public void sendMessage(final MessageStruct message) throws
> CommunicationException {
> log.trace("Sending message {} to server", message);
> GrizzlyFuture<WriteResult<MessageStruct, SocketAddress>> write =
> udpConnection.write(message);
> try {
> WriteResult<MessageStruct, SocketAddress> get = write.get();
> log.debug("Message {}, dstA {}, size {}", get.getMessage(),
> get.getDstAddress(), get.getWrittenSize());
> } catch (InterruptedException | ExecutionException ex) {
> }
> }
>
> @Override
> public Set<Model> getModels(final RequestAssetsMessage msg) throws
> CommunicationException {
>
> final GrizzlyFuture writeResult = tcpConnection.write(msg);
> try {
> final Object response = writeResult.get();
> if (response instanceof Set) {
> return ((Set<Model>) response);
> } else {
> throw new ProtocolException("Response was not a set of models : " +
> response);
> }
> } catch (InterruptedException | ExecutionException e) {
> throw new CommunicationTimeoutException(e);
> }
> }
>
> @Override
> public void addListener(final MessageEventListener messageEventListener) {
>
> this.callbackHandler.addListener(checkNotNull(messageEventListener));
> }
>
> @Override
> public void removeListener(final MessageEventListener
> messageEventListener) {
>
> this.callbackHandler.removeListener(messageEventListener);
> }
>
> @Override
> protected void startUp() throws Exception {
>
> FilterChainBuilder tcpFilterChainBuilder =
> createFilterChainBuilder("TCP");
> this.tcpTransport = TCPNIOTransportBuilder.newInstance().build();
> this.tcpTransport.setProcessor(tcpFilterChainBuilder.build());
> this.tcpTransport.start();
> final GrizzlyFuture<Connection> tcpConn =
> this.tcpTransport.connect(new InetSocketAddress(serverAddress, tcpPort));
> log.debug("Trying TCP connection");
> this.tcpConnection = checkNotNull(tcpConn.get(10, TimeUnit.SECONDS));
>
> FilterChainBuilder udpFilterChainBuilder =
> createFilterChainBuilder("UDP");
> // udpFilterChainBuilder.add(new
> MessageDispatchFilter(this.callbackHandler));
> this.udpTransport = UDPNIOTransportBuilder.newInstance().build();
> udpTransport.setProcessor(udpFilterChainBuilder.build());
> udpTransport.start();
> final GrizzlyFuture<Connection> udpConn = udpTransport.connect(new
> InetSocketAddress(serverAddress, udpPort));
> log.debug("Trying UDP connection");
> this.udpConnection = checkNotNull(udpConn.get(10, TimeUnit.SECONDS));
>
> log.debug("Connections to server TCP: {}, UDP: {}",
> this.tcpConnection, this.udpConnection);
> }
>
> @Override
> protected void shutDown() throws Exception {
>
> log.debug("Closing connections to server");
> final GrizzlyFuture udpClose = this.udpConnection != null ?
> this.udpConnection.close() : null;
> final GrizzlyFuture tcpClose = this.tcpConnection != null ?
> this.tcpConnection.close() : null;
> if (udpClose != null) {
> log.debug("Waiting for UDP to close");
> udpClose.get();
> log.debug("UDP Closed");
> }
> if (tcpClose != null) {
> log.debug("Waiting for TCP to close");
> tcpClose.get();
> log.debug("TCP closed");
> }
> }
>
> private FilterChainBuilder createFilterChainBuilder(final String
> protocol) {
> FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
> filterChainBuilder.add(new TransportFilter());
> filterChainBuilder.add(new LoggingFilter("Client " + protocol));
> filterChainBuilder.add(new SerializableFilter());
> return filterChainBuilder;
> }
> }
>
>
> And here is the server code:
> === Server ===
> public class GrizzlyService extends AbstractIdleService {
>
> private final Logger log = LoggerFactory.getLogger(getClass());
> private final ServerService serverService;
> private TCPNIOTransport tcpTransport;
> private UDPNIOTransport udpTransport ;
> private final InetSocketAddress tcpListenAddress ;
> private final InetSocketAddress udpListenAddress ;
> private final MessageDispatcher clientMessageDispatcher ;
>
> public GrizzlyService(
> final ServerService serverService,
> final MessageDispatcher clientMessageDispatcher,
> final int tcpPort,
> final int udpPort) {
>
> this.serverService = checkNotNull(serverService);
> this.clientMessageDispatcher = checkNotNull(clientMessageDispatcher) ;
> this.tcpListenAddress = new InetSocketAddress((tcpPort < 255) ?
> ConnectionConstants.DEFAULT_TCP_PORT: tcpPort) ;
> this.udpListenAddress = new InetSocketAddress((udpPort < 255) ?
> ConnectionConstants.DEFAULT_UDP_PORT : udpPort) ;
> }
>
> @Override
> protected void startUp() throws Exception {
> FilterChainBuilder tcpFilterChainBuilder =
> createFilterChainBuilder("TCP");
> tcpFilterChainBuilder.add(new ModelFilter(this.serverService));
>
> this.tcpTransport = TCPNIOTransportBuilder.newInstance().build();
> this.tcpTransport.setProcessor(tcpFilterChainBuilder.build());
> TCPNIOServerConnection tcpServerConnection =
> this.tcpTransport.bind(this.tcpListenAddress);
> this.tcpTransport.start();
> log.debug("Server TCP started on {}",
> tcpServerConnection.getLocalAddress());
>
> FilterChainBuilder udpFilterChainBuilder =
> createFilterChainBuilder("UDP");
> udpFilterChainBuilder.add(new
> MessageDispatchFilter(this.clientMessageDispatcher));
>
> this.udpTransport = UDPNIOTransportBuilder.newInstance().build();
> this.udpTransport.setProcessor(udpFilterChainBuilder.build());
> UDPNIOServerConnection udpServerConnection =
> this.udpTransport.bind(this.udpListenAddress);
> this.udpTransport.start();
> log.debug("Server UDP started on {}",
> udpServerConnection.getLocalAddress());
> }
>
> @Override
> protected void shutDown() throws Exception {
>
> log.debug("Stopping Grizzly transport");
> this.tcpTransport.stop();
> this.udpTransport.stop();
> }
>
> private FilterChainBuilder createFilterChainBuilder(final String
> protocol) {
> final FilterChainBuilder filterChainBuilder =
> FilterChainBuilder.stateless();
> filterChainBuilder.add(new TransportFilter());
> filterChainBuilder.add(new LoggingFilter("Server " + protocol));
> filterChainBuilder.add(new SerializableFilter());
> return filterChainBuilder;
> }
> }
>
> Here are the filters I use, the transformers just use ObjectStreams to
> create a byte[] of the message. They also log the number of bytes and
> I can see logging from the client filter chain but not from the server.
>
> === Filters ===
>
> public class SerializableFilter extends AbstractCodecFilter<Buffer,
> Serializable> {
>
> public SerializableFilter() {
>
> super(new DeserializableTransformer(), new
> SerializableTransformer());
> }
> }
>
> public class LoggingFilter extends BaseFilter {
>
> private final String name ;
> private final Logger log = LoggerFactory.getLogger(getClass());
>
> public LoggingFilter(final String name) {
> this.name <http://this.name> = name;
> }
>
> @Override
> public NextAction handleRead(FilterChainContext ctx) throws IOException {
> log.trace("{} : Read", this.name <http://this.name>);
> return super.handleRead(ctx);
> }
>
> @Override
> public NextAction handleWrite(FilterChainContext ctx) throws IOException {
> log.trace("{} : Write", this.name <http://this.name>);
> return super.handleWrite(ctx);
> }
>
> @Override
> public NextAction handleConnect(FilterChainContext ctx) throws
> IOException {
> log.trace("{} : Connect", this.name <http://this.name>);
> return super.handleConnect(ctx);
> }
>
> @Override
> public NextAction handleAccept(FilterChainContext ctx) throws
> IOException {
> log.trace("{} : Accept", this.name <http://this.name>);
> return super.handleAccept(ctx);
> }
>
> @Override
> public NextAction handleEvent(FilterChainContext ctx, FilterChainEvent
> event) throws IOException {
> log.trace("{} : Event", this.name <http://this.name>);
> return super.handleEvent(ctx, event);
> }
>
> @Override
> public NextAction handleClose(FilterChainContext ctx) throws IOException {
> log.trace("{} : Close", this.name <http://this.name>);
> return super.handleClose(ctx);
> }
> }
>
>
> >Hi Johan,
> >the client code looks ok, can you pls. share both the client and
> server code?
> >Thanks.
> >WBR,
> >Alexey.
>
> On 09.06.13 06:44, Johan Maasing wrote:
>> Hi,
>> I'm struggling (new to Grizzly) to get UDP traffic going between a
>> server and client. Currently I'm stuck at getting the client to
>> send.I hope someone can spot what I'm doing wrong.
>>