Wow! I am impressed that you managed to find the bug without the source code :-D Thank you!
/Johan M
=====
protected TransformationResult<Serializable, Buffer> transformImpl(final AttributeStorage attributeStorage,
final Serializable serializable)
throws TransformationException {
Logger log = LoggerFactory.getLogger(getClass());
if (serializable == null) {
throw new TransformationException("Will not serialize null");
}
log.trace("Serializing {}", serializable);
try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try (final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(serializable);
final Buffer buffer = obtainMemoryManager(attributeStorage).allocate(byteArrayOutputStream.size());
final byte[] bytes = byteArrayOutputStream.toByteArray();
log.trace("Serializing resulted in {} bytes", bytes.length);
buffer.put(bytes);
--->>> Added this <<<---
--->>> buffer.flip(); <<<---
final TransformationResult<Serializable, Buffer> result = TransformationResult
.createCompletedResult(buffer, null);
return result;
}
} catch (IOException e) {
throw new TransformationException(e);
}
}
On 10 jun 2013, at 23:49, Oleksiy Stashok <oleksiy.stashok_at_oracle.com> wrote:
> Hi Johan,
>
> I think the problem is in SerializableFilter (SerializableTransformer), most probably you forgot to flip the result Buffer.
> If you're not sure what exactly to do - pls. share the SerializableTransformer code.
>
> Thanks.
>
> WBR,
> Alexey.
>
> On 10.06.13 11:47, Johan Maasing wrote:
>> Sorry if I loose the thread history, I messed up my mail clients :)
>>
>> Here is the client and server code, I haven't got around to the TCP stuff so that might be broken but that isn't my problem right now.
>> However, if I use 'netcat -u -l' as a mock server shouldn't I see it outputting something if the client sent data?
>> Also, if I use netcat as mock client I see that logging that server tries to process whatever I send from netcat so it is at least activated on UDP traffic.
>>
>>
>> === Client ===
>> public class GrizzlyServerConnection extends AbstractIdleService implements ServerConnection {
>>
>> private final Logger log = LoggerFactory.getLogger(getClass());
>> private final String serverAddress;
>> private final int tcpPort;
>> private final int udpPort;
>> private final MessageDispatcher callbackHandler;
>> private TCPNIOTransport tcpTransport;
>> private UDPNIOTransport udpTransport;
>> private Connection tcpConnection;
>> private Connection udpConnection;
>>
>> public GrizzlyServerConnection(final String serverAddress, final int tcpPort, final int udpPort, final MessageDispatcher callbackHandler) {
>> checkNotNull(serverAddress);
>> checkArgument(serverAddress.length() > 1);
>> this.serverAddress = serverAddress;
>> this.tcpPort = (tcpPort > 255) ? tcpPort : ConnectionConstants.DEFAULT_TCP_PORT;
>> this.udpPort = (udpPort > 255) ? udpPort : ConnectionConstants.DEFAULT_UDP_PORT;
>> this.callbackHandler = checkNotNull(callbackHandler);
>> }
>>
>> @Override
>> public void sendMessage(final MessageStruct message) throws CommunicationException {
>> log.trace("Sending message {} to server", message);
>> GrizzlyFuture<WriteResult<MessageStruct, SocketAddress>> write = udpConnection.write(message);
>>
>> try {
>> WriteResult<MessageStruct, SocketAddress> get = write.get();
>> log.debug("Message {}, dstA {}, size {}", get.getMessage(), get.getDstAddress(), get.getWrittenSize());
>> } catch (InterruptedException | ExecutionException ex) {
>> }
>> }
>>
>> @Override
>> public Set<Model> getModels(final RequestAssetsMessage msg) throws CommunicationException {
>>
>> final GrizzlyFuture writeResult = tcpConnection.write(msg);
>> try {
>> final Object response = writeResult.get();
>> if (response instanceof Set) {
>> return ((Set<Model>) response);
>> } else {
>> throw new ProtocolException("Response was not a set of models : " + response);
>> }
>> } catch (InterruptedException | ExecutionException e) {
>> throw new CommunicationTimeoutException(e);
>> }
>> }
>>
>> @Override
>> public void addListener(final MessageEventListener messageEventListener) {
>>
>> this.callbackHandler.addListener(checkNotNull(messageEventListener));
>> }
>>
>> @Override
>> public void removeListener(final MessageEventListener messageEventListener) {
>>
>> this.callbackHandler.removeListener(messageEventListener);
>> }
>>
>> @Override
>> protected void startUp() throws Exception {
>>
>> FilterChainBuilder tcpFilterChainBuilder = createFilterChainBuilder("TCP");
>> this.tcpTransport = TCPNIOTransportBuilder.newInstance().build();
>> this.tcpTransport.setProcessor(tcpFilterChainBuilder.build());
>> this.tcpTransport.start();
>> final GrizzlyFuture<Connection> tcpConn = this.tcpTransport.connect(new InetSocketAddress(serverAddress, tcpPort));
>> log.debug("Trying TCP connection");
>> this.tcpConnection = checkNotNull(tcpConn.get(10, TimeUnit.SECONDS));
>>
>> FilterChainBuilder udpFilterChainBuilder = createFilterChainBuilder("UDP");
>> // udpFilterChainBuilder.add(new MessageDispatchFilter(this.callbackHandler));
>> this.udpTransport = UDPNIOTransportBuilder.newInstance().build();
>> udpTransport.setProcessor(udpFilterChainBuilder.build());
>> udpTransport.start();
>> final GrizzlyFuture<Connection> udpConn = udpTransport.connect(new InetSocketAddress(serverAddress, udpPort));
>> log.debug("Trying UDP connection");
>> this.udpConnection = checkNotNull(udpConn.get(10, TimeUnit.SECONDS));
>>
>> log.debug("Connections to server TCP: {}, UDP: {}", this.tcpConnection, this.udpConnection);
>> }
>>
>> @Override
>> protected void shutDown() throws Exception {
>>
>> log.debug("Closing connections to server");
>> final GrizzlyFuture udpClose = this.udpConnection != null ? this.udpConnection.close() : null;
>> final GrizzlyFuture tcpClose = this.tcpConnection != null ? this.tcpConnection.close() : null;
>> if (udpClose != null) {
>> log.debug("Waiting for UDP to close");
>> udpClose.get();
>> log.debug("UDP Closed");
>> }
>> if (tcpClose != null) {
>> log.debug("Waiting for TCP to close");
>> tcpClose.get();
>> log.debug("TCP closed");
>> }
>> }
>>
>> private FilterChainBuilder createFilterChainBuilder(final String protocol) {
>> FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
>> filterChainBuilder.add(new TransportFilter());
>> filterChainBuilder.add(new LoggingFilter("Client " + protocol));
>> filterChainBuilder.add(new SerializableFilter());
>> return filterChainBuilder;
>> }
>> }
>>
>>
>> And here is the server code:
>> === Server ===
>> public class GrizzlyService extends AbstractIdleService {
>>
>> private final Logger log = LoggerFactory.getLogger(getClass());
>> private final ServerService serverService;
>> private TCPNIOTransport tcpTransport;
>> private UDPNIOTransport udpTransport ;
>> private final InetSocketAddress tcpListenAddress ;
>> private final InetSocketAddress udpListenAddress ;
>> private final MessageDispatcher clientMessageDispatcher ;
>>
>> public GrizzlyService(
>> final ServerService serverService,
>> final MessageDispatcher clientMessageDispatcher,
>> final int tcpPort,
>> final int udpPort) {
>>
>> this.serverService = checkNotNull(serverService);
>> this.clientMessageDispatcher = checkNotNull(clientMessageDispatcher) ;
>> this.tcpListenAddress = new InetSocketAddress((tcpPort < 255) ? ConnectionConstants.DEFAULT_TCP_PORT: tcpPort) ;
>> this.udpListenAddress = new InetSocketAddress((udpPort < 255) ? ConnectionConstants.DEFAULT_UDP_PORT : udpPort) ;
>> }
>>
>> @Override
>> protected void startUp() throws Exception {
>> FilterChainBuilder tcpFilterChainBuilder = createFilterChainBuilder("TCP");
>> tcpFilterChainBuilder.add(new ModelFilter(this.serverService));
>>
>> this.tcpTransport = TCPNIOTransportBuilder.newInstance().build();
>> this.tcpTransport.setProcessor(tcpFilterChainBuilder.build());
>> TCPNIOServerConnection tcpServerConnection = this.tcpTransport.bind(this.tcpListenAddress);
>> this.tcpTransport.start();
>>
>> log.debug("Server TCP started on {}", tcpServerConnection.getLocalAddress());
>>
>> FilterChainBuilder udpFilterChainBuilder = createFilterChainBuilder("UDP");
>> udpFilterChainBuilder.add(new MessageDispatchFilter(this.clientMessageDispatcher));
>>
>> this.udpTransport = UDPNIOTransportBuilder.newInstance().build();
>> this.udpTransport.setProcessor(udpFilterChainBuilder.build());
>> UDPNIOServerConnection udpServerConnection = this.udpTransport.bind(this.udpListenAddress);
>> this.udpTransport.start();
>>
>> log.debug("Server UDP started on {}", udpServerConnection.getLocalAddress());
>> }
>>
>> @Override
>> protected void shutDown() throws Exception {
>>
>> log.debug("Stopping Grizzly transport");
>> this.tcpTransport.stop();
>> this.udpTransport.stop();
>> }
>>
>> private FilterChainBuilder createFilterChainBuilder(final String protocol) {
>> final FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
>> filterChainBuilder.add(new TransportFilter());
>> filterChainBuilder.add(new LoggingFilter("Server " + protocol));
>> filterChainBuilder.add(new SerializableFilter());
>> return filterChainBuilder;
>> }
>> }
>>
>> Here are the filters I use, the transformers just use ObjectStreams to create a byte[] of the message. They also log the number of bytes and I can see logging from the client filter chain but not from the server.
>>
>> === Filters ===
>>
>> public class SerializableFilter extends AbstractCodecFilter<Buffer, Serializable> {
>>
>> public SerializableFilter() {
>>
>> super(new DeserializableTransformer(), new SerializableTransformer());
>> }
>> }
>>
>> public class LoggingFilter extends BaseFilter {
>>
>> private final String name ;
>> private final Logger log = LoggerFactory.getLogger(getClass());
>>
>> public LoggingFilter(final String name) {
>> this.name = name;
>> }
>>
>> @Override
>> public NextAction handleRead(FilterChainContext ctx) throws IOException {
>> log.trace("{} : Read", this.name);
>> return super.handleRead(ctx);
>> }
>>
>> @Override
>> public NextAction handleWrite(FilterChainContext ctx) throws IOException {
>> log.trace("{} : Write", this.name);
>> return super.handleWrite(ctx);
>> }
>>
>> @Override
>> public NextAction handleConnect(FilterChainContext ctx) throws IOException {
>> log.trace("{} : Connect", this.name);
>> return super.handleConnect(ctx);
>> }
>>
>> @Override
>> public NextAction handleAccept(FilterChainContext ctx) throws IOException {
>> log.trace("{} : Accept", this.name);
>> return super.handleAccept(ctx);
>> }
>>
>> @Override
>> public NextAction handleEvent(FilterChainContext ctx, FilterChainEvent event) throws IOException {
>> log.trace("{} : Event", this.name);
>> return super.handleEvent(ctx, event);
>> }
>>
>> @Override
>> public NextAction handleClose(FilterChainContext ctx) throws IOException {
>> log.trace("{} : Close", this.name);
>> return super.handleClose(ctx);
>> }
>> }
>>
>>
>> >Hi Johan,
>> >the client code looks ok, can you pls. share both the client and server code?
>> >Thanks.
>> >WBR,
>> >Alexey.
>>
>> On 09.06.13 06:44, Johan Maasing wrote:
>>> Hi,
>>> I'm struggling (new to Grizzly) to get UDP traffic going between a server and client. Currently I'm stuck at getting the client to send.I hope someone can spot what I'm doing wrong.
>>>
>