users@grizzly.java.net

Re: suitability for writing UDP servers

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Wed, 31 Oct 2007 22:43:45 +0100

Radim Kolar SF.NET wrote:
>> I checked on Windows XP with JDK 1.5.0_11. Packet comes just on one of the
>> channels, but from run to run it chooses different channel: sometimes new
>> registered ("dedicated"), sometimes old "server" channel.
>> Looks like this trick could be unpredictable.
>>
> this is easy to work with. if they arrive to dedicated channel, just process them and you must check all arriving packets on server channel against list of clients anyway.
>
yes. But problem I see (on Windows XP, not sure about other OSs), that
after creating dedicated channel in response to the first came
message... it happens that *all* following client messages are coming
either just to dedicated channel or just to server channel from run to
run. Here is simple test I wrote (attached).

WBR,
Alexey.
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>


/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */

package test;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author Oleksiy
 */
public class UDPTesterSingleSelector {
    private static final String HOST = "localhost";
    private static final int PORT = 19000;
    
    private static final long TIME_OUT = 2000;
    
    private boolean isStoped;
    private Selector selector;
    
    private AtomicBoolean flag = new AtomicBoolean(true);
    
    public static void main(String[] args) throws IOException {
        new UDPTesterSingleSelector().process();
    }

    private void process() throws IOException {
        try {
            startServer();
            runClient();
        } finally {
            isStoped = true;
        }
    }

    private void runClient() throws IOException {
        DatagramSocket socket = new DatagramSocket();
        try {
            SocketAddress socketAddress = new InetSocketAddress(HOST, PORT);
            for (int i = 0; i < 10; i++) {
                String testString = "Packet #" + i;
                byte[] buf = testString.getBytes();
                DatagramPacket packet = new DatagramPacket(buf, buf.length, socketAddress);
                socket.send(packet);
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                }
            }
        } finally {
            if (socket != null) {
                socket.close();
            }
        }
    }

    private void startServer() throws IOException {
        final CountDownLatch latch = new CountDownLatch(1);
        
        new Thread() {
            @Override
            public void run() {
                try {
                    selector = Selector.open();
                    DatagramChannel datagramChannel = DatagramChannel.open();

                    DatagramSocket datagramSocket = datagramChannel.socket();
                    datagramSocket.setReuseAddress(true);
                    datagramSocket.bind(new InetSocketAddress(HOST, PORT));
                    datagramChannel.configureBlocking(false);

                    datagramChannel.register(selector, SelectionKey.OP_READ);
                    
                    latch.countDown();
                    
                    runSelect("SERVER", selector, new ReceiveCallback() {
                        public void onReceived(SocketAddress address) throws IOException {
                            if (flag.getAndSet(false)) {
                                runDedicatedChannel(address);
                            }
                        }
                    });
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }.start();
        
        try {
            latch.await();
        } catch (Exception e) {
        }
    }

    private void runSelect(String name, Selector selector, ReceiveCallback receiveCallback) throws IOException {
        ByteBuffer buf = ByteBuffer.allocate(8192);
        while (!isStoped) {
            int keys = selector.select(TIME_OUT);
            if (keys > 0) {
                Set<SelectionKey> keySet = selector.selectedKeys();
                Iterator<SelectionKey> it = keySet.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove();
                    if (key.isReadable()) {
                        DatagramChannel channel = (DatagramChannel) key.channel();
                        SocketAddress address = channel.receive(buf);
                        
                        System.out.println("Receiver \"" + name + "\" Channel: " + channel + " remoteAddress: " + channel.socket().getInetAddress() + " Selector: " + selector + " received message from: " + address);
                        buf.clear();
                        if (receiveCallback != null) {
                            receiveCallback.onReceived(address);
                        }
                    }
                }
            }
        }
    }
    
    private void runDedicatedChannel(final SocketAddress address) throws IOException {
        DatagramChannel datagramChannel = DatagramChannel.open();

        DatagramSocket datagramSocket = datagramChannel.socket();
        datagramSocket.setReuseAddress(true);
        datagramSocket.bind(new InetSocketAddress(HOST, PORT));
        datagramSocket.connect(address);
        datagramChannel.configureBlocking(false);

        datagramChannel.register(selector, SelectionKey.OP_READ);
        System.out.println("Registered dedicated channel: " + datagramChannel + " on selector: " + selector);
    }

    private interface ReceiveCallback {
        public void onReceived(SocketAddress address) throws IOException;
    }
}