Radim Kolar SF.NET wrote:
>> I checked on Windows XP with JDK 1.5.0_11. Packet comes just on one of the
>> channels, but from run to run it chooses different channel: sometimes new
>> registered ("dedicated"), sometimes old "server" channel.
>> Looks like this trick could be unpredictable.
>>
> this is easy to work with. if they arrive to dedicated channel, just process them and you must check all arriving packets on server channel against list of clients anyway.
>
yes. But problem I see (on Windows XP, not sure about other OSs), that
after creating dedicated channel in response to the first came
message... it happens that *all* following client messages are coming
either just to dedicated channel or just to server channel from run to
run. Here is simple test I wrote (attached).
WBR,
Alexey.
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package test;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Oleksiy
*/
public class UDPTesterSingleSelector {
private static final String HOST = "localhost";
private static final int PORT = 19000;
private static final long TIME_OUT = 2000;
private boolean isStoped;
private Selector selector;
private AtomicBoolean flag = new AtomicBoolean(true);
public static void main(String[] args) throws IOException {
new UDPTesterSingleSelector().process();
}
private void process() throws IOException {
try {
startServer();
runClient();
} finally {
isStoped = true;
}
}
private void runClient() throws IOException {
DatagramSocket socket = new DatagramSocket();
try {
SocketAddress socketAddress = new InetSocketAddress(HOST, PORT);
for (int i = 0; i < 10; i++) {
String testString = "Packet #" + i;
byte[] buf = testString.getBytes();
DatagramPacket packet = new DatagramPacket(buf, buf.length, socketAddress);
socket.send(packet);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
} finally {
if (socket != null) {
socket.close();
}
}
}
private void startServer() throws IOException {
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
@Override
public void run() {
try {
selector = Selector.open();
DatagramChannel datagramChannel = DatagramChannel.open();
DatagramSocket datagramSocket = datagramChannel.socket();
datagramSocket.setReuseAddress(true);
datagramSocket.bind(new InetSocketAddress(HOST, PORT));
datagramChannel.configureBlocking(false);
datagramChannel.register(selector, SelectionKey.OP_READ);
latch.countDown();
runSelect("SERVER", selector, new ReceiveCallback() {
public void onReceived(SocketAddress address) throws IOException {
if (flag.getAndSet(false)) {
runDedicatedChannel(address);
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
try {
latch.await();
} catch (Exception e) {
}
}
private void runSelect(String name, Selector selector, ReceiveCallback receiveCallback) throws IOException {
ByteBuffer buf = ByteBuffer.allocate(8192);
while (!isStoped) {
int keys = selector.select(TIME_OUT);
if (keys > 0) {
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isReadable()) {
DatagramChannel channel = (DatagramChannel) key.channel();
SocketAddress address = channel.receive(buf);
System.out.println("Receiver \"" + name + "\" Channel: " + channel + " remoteAddress: " + channel.socket().getInetAddress() + " Selector: " + selector + " received message from: " + address);
buf.clear();
if (receiveCallback != null) {
receiveCallback.onReceived(address);
}
}
}
}
}
}
private void runDedicatedChannel(final SocketAddress address) throws IOException {
DatagramChannel datagramChannel = DatagramChannel.open();
DatagramSocket datagramSocket = datagramChannel.socket();
datagramSocket.setReuseAddress(true);
datagramSocket.bind(new InetSocketAddress(HOST, PORT));
datagramSocket.connect(address);
datagramChannel.configureBlocking(false);
datagramChannel.register(selector, SelectionKey.OP_READ);
System.out.println("Registered dedicated channel: " + datagramChannel + " on selector: " + selector);
}
private interface ReceiveCallback {
public void onReceived(SocketAddress address) throws IOException;
}
}