Migrate custom tests for UDP communication to junit, all current tests pass

This commit is contained in:
2015-06-21 12:25:56 +01:00
parent c9015b9293
commit 88eec41139
19 changed files with 177 additions and 152 deletions

View File

@@ -73,8 +73,8 @@ public abstract class TcpClient extends Work implements Sender {
} }
} }
public void exit() { public void stop() {
super.exit(); super.stop();
if (selector != null) { if (selector != null) {
selector.wakeup(); selector.wakeup();
} }
@@ -84,6 +84,7 @@ public abstract class TcpClient extends Work implements Sender {
try { try {
System.out.println("Client: Waiting for select... "); System.out.println("Client: Waiting for select... ");
System.out.println("Client: Number of selected keys: " + selector.select()); System.out.println("Client: Number of selected keys: " + selector.select());
System.out.println("wakker");
//selector.select(); //selector.select();
Set<SelectionKey> selectionKeySet = selector.selectedKeys(); Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = selectionKeySet.iterator(); Iterator<SelectionKey> selectionKeyIterator = selectionKeySet.iterator();

View File

@@ -16,6 +16,7 @@ import java.util.Set;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.sender.Sender; import base.sender.Sender;
import base.server.channel.TcpServerClient;
import base.work.Work; import base.work.Work;
public class TcpServer extends Work implements Sender { public class TcpServer extends Work implements Sender {
@@ -50,7 +51,6 @@ public class TcpServer extends Work implements Sender {
} }
public void activate() throws ActivateException { public void activate() throws ActivateException {
System.out.println("Server: Activate!"); System.out.println("Server: Activate!");
try { try {
// Get selector // Get selector
@@ -81,11 +81,15 @@ public class TcpServer extends Work implements Sender {
serverSocket.close(); serverSocket.close();
} catch (IOException e) { } catch (IOException e) {
throw new DeactivateException(); throw new DeactivateException();
} finally {
for (TcpServerClient client : clientList) {
client.stop();
}
} }
} }
public void exit() { public void stop() {
super.exit(); super.stop();
if (selector != null) { if (selector != null) {
selector.wakeup(); selector.wakeup();
} }

View File

@@ -4,14 +4,12 @@ import java.io.IOException;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.MulticastSocket; import java.net.MulticastSocket;
import java.util.ArrayList;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
import base.work.Listen;
import base.work.Work; import base.work.Work;
public class UdpMulticastClient extends Work { public abstract class UdpMulticastClient extends Work {
protected static final String HOST = "239.255.255.255";
protected static final int BUFFER_SIZE = 2048; protected static final int BUFFER_SIZE = 2048;
protected String host; protected String host;
@@ -19,7 +17,10 @@ public class UdpMulticastClient extends Work {
protected int bufferSize; protected int bufferSize;
protected MulticastSocket socket; protected MulticastSocket socket;
protected InetAddress group; protected InetAddress group;
protected ArrayList<Listen<byte[]>> listenList;
public UdpMulticastClient(int port) {
this(HOST, port);
}
public UdpMulticastClient(String host, int port) { public UdpMulticastClient(String host, int port) {
this(host, port, BUFFER_SIZE); this(host, port, BUFFER_SIZE);
@@ -29,7 +30,6 @@ public class UdpMulticastClient extends Work {
this.host = host; this.host = host;
this.port = port; this.port = port;
this.bufferSize = BUFFER_SIZE; this.bufferSize = BUFFER_SIZE;
listenList = new ArrayList<Listen<byte[]>>();
} }
public void work() { public void work() {
@@ -38,9 +38,7 @@ public class UdpMulticastClient extends Work {
DatagramPacket packet = new DatagramPacket(buffer, buffer.length); DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet); socket.receive(packet);
buffer = packet.getData(); buffer = packet.getData();
for (Listen<byte[]> listen : listenList) { input(buffer);
listen.add(buffer);
}
} catch (IOException e) {} } catch (IOException e) {}
} }
@@ -50,24 +48,16 @@ public class UdpMulticastClient extends Work {
group = InetAddress.getByName(host); group = InetAddress.getByName(host);
socket.joinGroup(group); socket.joinGroup(group);
} catch (IOException e) { } catch (IOException e) {
logger.error("", e);
throw new ActivateException(); throw new ActivateException();
} }
} }
public void deactivate() throws DeactivateException { public void stop() {
try { System.out.println("client close socket");
socket.leaveGroup(group);
} catch (IOException e) {
throw new DeactivateException();
}
socket.close(); socket.close();
super.stop();
} }
public void register(Listen<byte[]> listen) { protected abstract void input(byte[] buffer);
listenList.add(listen);
}
public void remove(Listen<byte[]> listen) {
listenList.remove(listen);
}
} }

View File

@@ -9,15 +9,23 @@ import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.sender.Sender; import base.sender.Sender;
import base.work.Listen; import base.work.Listen;
import base.worker.Worker;
public class UdpMulticastServer extends Listen<byte[]> implements Sender { public class UdpMulticastServer extends Listen<byte[]> implements Sender {
protected static final String HOST = "239.255.255.255";
protected static final int BUFFER_SIZE = 2048; protected static final int BUFFER_SIZE = 2048;
protected String host; protected String host;
protected int port; protected int port;
protected MulticastSocket socket; protected MulticastSocket socket;
//private XX x;
public UdpMulticastServer(int port) {
this(HOST, port);
}
public UdpMulticastServer(String host, int port) { public UdpMulticastServer(String host, int port) {
super(Worker.Type.BACKGROUND);
this.host = host; this.host = host;
this.port = port; this.port = port;
} }
@@ -28,7 +36,8 @@ public class UdpMulticastServer extends Listen<byte[]> implements Sender {
// pass socket directly to Server to establish bidirectional // pass socket directly to Server to establish bidirectional
// couple together capabilities // couple together capabilities
// listen to datagrams and deal with writing using nio? // listen to datagrams and deal with writing using nio?
new XX(socket).start(); //x = new XX(socket);
//x.start();
} catch (IOException e) { } catch (IOException e) {
throw new ActivateException(); throw new ActivateException();
} }
@@ -36,18 +45,10 @@ public class UdpMulticastServer extends Listen<byte[]> implements Sender {
} }
public void deactivate() throws DeactivateException { public void deactivate() throws DeactivateException {
super.deactivate(); System.err.println("lets work the magic");
socket.close();
}
public boolean active() { socket.close();
return socket != null; super.deactivate();
/* Should handle connection state
if (socket == null) {
return false;
} else {
return socket.isConnected() && !socket.isClosed();
}*/
} }
public void input(byte[] buffer) { public void input(byte[] buffer) {

View File

@@ -4,7 +4,6 @@ import java.io.IOException;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.work.Work; import base.work.Work;
@@ -12,6 +11,7 @@ import base.work.Work;
public abstract class UdpServer extends Work { public abstract class UdpServer extends Work {
protected static final int BUFFER_SIZE = 1024; protected static final int BUFFER_SIZE = 1024;
protected static final int TIMEOUT = 1000; protected static final int TIMEOUT = 1000;
protected int port; protected int port;
protected int bufferSize; protected int bufferSize;
protected DatagramSocket diagramSocket; protected DatagramSocket diagramSocket;
@@ -38,13 +38,20 @@ public abstract class UdpServer extends Work {
} }
} }
public void stop() {
super.stop();
if (diagramSocket != null) {
diagramSocket.close();
}
}
public void work() { public void work() {
byte[] buffer = new byte[bufferSize]; byte[] buffer = new byte[bufferSize];
DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length); DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
try { try {
diagramSocket.receive(datagramPacket); diagramSocket.receive(datagramPacket);
} catch (SocketTimeoutException e) { } catch (SocketException e) {
return; stop();
} catch (IOException e) { } catch (IOException e) {
logger.error("Failed to receive packet", e); logger.error("Failed to receive packet", e);
stop(); stop();

View File

@@ -4,31 +4,34 @@ import java.io.IOException;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.MulticastSocket; import java.net.MulticastSocket;
public class XX extends Thread { import base.work.Work;
public class XX extends Work {
protected int bufferSize = 1024;
private MulticastSocket socket; private MulticastSocket socket;
private DatagramPacket dgram;
public XX(MulticastSocket socket) { public XX(MulticastSocket socket) {
this.socket = socket; this.socket = socket;
byte[] b = new byte[1024];
dgram = new DatagramPacket(b, b.length);
} }
public void run() {
while (true) {
byte[] b = new byte[1024];
DatagramPacket dgram = new DatagramPacket(b, b.length);
public void work() {
while(true) {
try { try {
socket.receive(dgram); socket.receive(dgram);
} catch (IOException e) { } catch (IOException e) {
// TODO Auto-generated catch block stop();
e.printStackTrace();
} // blocks until a datagram is received } // blocks until a datagram is received
System.err.println("Received " + dgram.getLength() + System.err.println("Received " + dgram.getLength() +
" bytes from " + dgram.getAddress()); " bytes from " + dgram.getAddress());
dgram.setLength(b.length); // must reset length field! dgram.setLength(bufferSize); // must reset length field!
} }
}
public void stop() {
super.stop();
socket.close();
} }
} }

View File

@@ -7,6 +7,7 @@ import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
import base.sender.Sender; import base.sender.Sender;
import base.work.Work; import base.work.Work;
@@ -39,13 +40,19 @@ public class TcpServer extends Work implements Sender {
logger.error("", e); logger.error("", e);
throw new ActivateException(); throw new ActivateException();
} }
super.activate(); }
public void deactivate() throws DeactivateException {
for (TcpServerClient client : clientList) {
client.stop();
}
} }
public void exit() { public void exit() {
super.exit(); super.exit();
try { try {
serverSocket.close(); serverSocket.close();
// Should check if clients exit as well
} catch (IOException e) { } catch (IOException e) {
logger.error("", e); logger.error("", e);
} }

View File

@@ -32,7 +32,6 @@ public class TcpServerClient extends AbstractTcpClient {
} }
public void input(byte[] buffer) { public void input(byte[] buffer) {
System.out.println(1234);
server.input(this, buffer); server.input(this, buffer);
} }
} }

View File

@@ -71,15 +71,24 @@ public abstract class Listen<E> extends Work implements Listener<E> {
} }
} }
public void stop() {
super.stop();
synchronized (this) {
notifyAll();
}
}
public void work() { public void work() {
System.err.println(this.getClass().getName());
while (!queue.isEmpty()) { while (!queue.isEmpty()) {
logger.debug("Listen: work() > input");
input(queue.poll()); input(queue.poll());
} }
synchronized (this) { synchronized (this) {
logger.debug("Listen: work() > wait");
try { try {
wait(); wait();
} catch (InterruptedException e) {} } catch (InterruptedException e) {}
logger.debug("Listen: work() > notified");
} }
} }

View File

@@ -47,20 +47,22 @@ public abstract class Work {
} }
public void start() { public void start() {
logger.debug("Start work"); logger.debug("Work: start()");
worker.start(); worker.start();
} }
public void stop() { public void stop() {
logger.debug("Stop work"); logger.debug("Work: stop()");
worker.stop(); worker.stop();
} }
public boolean active() { public boolean active() {
logger.debug("Work: active()");
return worker.active(); return worker.active();
} }
public void exit() { public void exit() {
logger.debug("Work: exit()");
worker.exit(); worker.exit();
} }

View File

@@ -24,7 +24,7 @@ public class ThreadWorker extends Worker implements Runnable {
run = true; run = true;
if (thread) { if (thread) {
logger.debug("Start thread"); logger.debug("Start thread");
new Thread(this, getClass().getName()).start(); new Thread(this, work.getClass().getName()).start();
} else { } else {
logger.debug("Run directly"); logger.debug("Run directly");
run(); run();
@@ -38,13 +38,8 @@ public class ThreadWorker extends Worker implements Runnable {
start(thread); start(thread);
} }
public synchronized void stop() {
super.stop();
notifyAll();
}
public void exit() { public void exit() {
work.stop();
run = false; run = false;
stop();
} }
} }

View File

@@ -14,7 +14,7 @@ public abstract class Worker {
public static final int SLEEP = 100; public static final int SLEEP = 100;
protected Logger logger = LoggerFactory.getLogger(getClass()); protected Logger logger;
protected boolean run = false; protected boolean run = false;
protected boolean active = false; protected boolean active = false;
@@ -25,22 +25,31 @@ public abstract class Worker {
public Worker(Work work) { public Worker(Work work) {
this.work = work; this.work = work;
logger = LoggerFactory.getLogger(work.getClass());
} }
public boolean active() { public boolean active() {
logger.debug("Worker: active()");
System.out.println(activate + " " + deactivate + " " + active + ": " + (deactivate || active));
return deactivate || active; return deactivate || active;
} }
public final void run() { public final void run() {
logger.debug("Worker: run()");
while (run || deactivate) { while (run || deactivate) {
System.err.println("xxx");
runActivate(); runActivate();
System.err.println("act");
runDeactivate(); runDeactivate();
System.err.println("deact");
runWork(); runWork();
System.err.println("---" + getClass().getName() + run + " " + deactivate);
} }
} }
public void runActivate() { public void runActivate() {
if (activate && !active) { if (activate && !active) {
logger.debug("Worker: runActivate()");
try { try {
work.activate(); work.activate();
active = true; active = true;
@@ -54,6 +63,7 @@ public abstract class Worker {
public void runDeactivate() { public void runDeactivate() {
if (deactivate && active) { if (deactivate && active) {
logger.debug("Worker: runDeactivate()");
try { try {
work.deactivate(); work.deactivate();
} catch (DeactivateException e) { } catch (DeactivateException e) {
@@ -67,9 +77,11 @@ public abstract class Worker {
public void runWork() { public void runWork() {
if (active) { if (active) {
logger.debug("Worker: runWork() > work");
work.work(); work.work();
} else if (run) { } else if (run) {
try { try {
logger.debug("Worker: runWork() > wait");
synchronized (this) { synchronized (this) {
wait(); wait();
} }
@@ -96,10 +108,13 @@ public abstract class Worker {
public abstract void start(); public abstract void start();
public void stop() { public void stop() {
logger.debug("Worker: stop()");
logger.debug("Worker: stop() " + active + " " + activate);
if (active && !activate) { if (active && !activate) {
deactivate = true; deactivate = true;
} }
activate = false; activate = false;
logger.debug("Worker: stop() " + deactivate);
} }
abstract public void exit(); abstract public void exit();

View File

@@ -1,27 +0,0 @@
package test.server;
import test.server.dummy.DummyUdpListen;
import test.server.dummy.DummyWriter;
import base.server.datagram.UdpMulticastClient;
import base.server.datagram.UdpMulticastServer;
public class TestUdpMulticastCommunication {
public static void main(String[] args) {
// Test Client (multicast) < Server
String host = "239.255.255.255";
int port = 4446;
UdpMulticastServer y = new UdpMulticastServer(host, port);
y.start();
UdpMulticastClient x = new UdpMulticastClient(host, port);
x.start();
DummyUdpListen z = new DummyUdpListen();
x.register(z);
z.start();
new DummyWriter(y).start();
try {
Thread.sleep(100000);
} catch (InterruptedException e) {}
}
}

View File

@@ -1,14 +0,0 @@
package test.server.dummy;
import base.work.Listen;
public class DummyUdpListen extends Listen<byte[]> {
public DummyUdpListen() {
super();
}
public void input(byte[] buffer) {
String received = new String(buffer).trim();
System.out.println("Quote of the Moment: " + received);
}
}

View File

@@ -1,32 +0,0 @@
package test.server.dummy;
import java.io.IOException;
import base.sender.Sender;
import base.work.Work;
public class DummyWriter extends Work implements Sender {
private Sender sender;
public DummyWriter(Sender sender) {
this.sender = sender;
}
public void work() {
System.out.println("Client sending messages to server...");
String [] messages = new String[] {"Time goes fast.", "What now?", "Bye."};
try {
for (int i = 0; i < messages.length; i++) {
System.out.println(messages[i]);
send(new String(messages[i]).getBytes());
sleep(200);
}
stop();
} catch (Exception e) {}
}
public void send(byte[] buffer) throws IOException {
sender.send(buffer);
}
}

View File

@@ -8,7 +8,8 @@ import org.junit.runners.Suite.SuiteClasses;
@SuiteClasses({ @SuiteClasses({
TestTcpSocketCommunication.class, TestTcpSocketCommunication.class,
TestTcpChannelCommunication.class, TestTcpChannelCommunication.class,
TestUdpCommunication.class TestUdpUnicastCommunication.class,
TestUdpMulticastCommunication.class
}) })
public class AllTests {} public class AllTests {}

View File

@@ -5,7 +5,6 @@ import static org.junit.Assert.assertNotNull;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import base.server.socket.TcpClient; import base.server.socket.TcpClient;

View File

@@ -0,0 +1,65 @@
package junit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import base.server.datagram.UdpMulticastClient;
import base.server.datagram.UdpMulticastServer;
public class TestUdpMulticastCommunication {
protected UdpMulticastServer server;
protected TestUdpMulticastClient client;
@Before
public void setUp() throws Exception {
server = new UdpMulticastServer(1234);
server.start();
client = new TestUdpMulticastClient(1234);
client.start();
}
@After
public void tearDown() throws Exception {
client.exit();
server.exit();
// Should add blocking stop and exit to worker
while (client.active() || server.active()) {
Thread.sleep(1000);
}
}
@Test
public void testServerToClientCommunication() throws Exception {
String message = "test";
server.send(message.getBytes());
System.err.println("send");
synchronized (client) {
client.wait(2000);
}
System.err.println("after wait");
byte[] buffer = client.buffer;
assertNotNull("Received input", buffer);
assertEquals("Message intact", message, new String(buffer).trim());
}
class TestUdpMulticastClient extends UdpMulticastClient {
public byte[] buffer;
public TestUdpMulticastClient(int port) {
super(port);
}
protected void input(byte[] buffer) {
this.buffer = buffer;
System.err.println("binnen");
synchronized (this) {
notifyAll();
}
}
}
}

View File

@@ -10,7 +10,7 @@ import org.junit.Test;
import base.sender.UdpSender; import base.sender.UdpSender;
import base.server.datagram.UdpServer; import base.server.datagram.UdpServer;
public class TestUdpCommunication { public class TestUdpUnicastCommunication {
protected TestUdpServer server; protected TestUdpServer server;
protected UdpSender sender; protected UdpSender sender;