From 88eec41139c8880c40cd62c7539bd6670baf128e Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Sun, 21 Jun 2015 12:25:56 +0100 Subject: [PATCH] Migrate custom tests for UDP communication to junit, all current tests pass --- .../java/base/server/channel/TcpClient.java | 5 +- .../java/base/server/channel/TcpServer.java | 10 ++- .../server/datagram/UdpMulticastClient.java | 36 ++++------ .../server/datagram/UdpMulticastServer.java | 25 +++---- .../java/base/server/datagram/UdpServer.java | 13 +++- .../main/java/base/server/datagram/XX.java | 37 ++++++----- .../java/base/server/socket/TcpServer.java | 9 ++- .../base/server/socket/TcpServerClient.java | 1 - java/base/src/main/java/base/work/Listen.java | 15 ++++- java/base/src/main/java/base/work/Work.java | 6 +- .../main/java/base/worker/ThreadWorker.java | 9 +-- .../src/main/java/base/worker/Worker.java | 19 +++++- .../server/TestUdpMulticastCommunication.java | 27 -------- .../test/server/dummy/DummyUdpListen.java | 14 ---- .../java/test/server/dummy/DummyWriter.java | 32 --------- java/base/src/test/java/junit/AllTests.java | 3 +- .../junit/TestTcpSocketCommunication.java | 1 - .../junit/TestUdpMulticastCommunication.java | 65 +++++++++++++++++++ ....java => TestUdpUnicastCommunication.java} | 2 +- 19 files changed, 177 insertions(+), 152 deletions(-) delete mode 100644 java/base/src/main/java/test/server/TestUdpMulticastCommunication.java delete mode 100644 java/base/src/main/java/test/server/dummy/DummyUdpListen.java delete mode 100644 java/base/src/main/java/test/server/dummy/DummyWriter.java create mode 100644 java/base/src/test/java/junit/TestUdpMulticastCommunication.java rename java/base/src/test/java/junit/{TestUdpCommunication.java => TestUdpUnicastCommunication.java} (96%) diff --git a/java/base/src/main/java/base/server/channel/TcpClient.java b/java/base/src/main/java/base/server/channel/TcpClient.java index 43b9bba..1c5465c 100644 --- a/java/base/src/main/java/base/server/channel/TcpClient.java +++ b/java/base/src/main/java/base/server/channel/TcpClient.java @@ -73,8 +73,8 @@ public abstract class TcpClient extends Work implements Sender { } } - public void exit() { - super.exit(); + public void stop() { + super.stop(); if (selector != null) { selector.wakeup(); } @@ -84,6 +84,7 @@ public abstract class TcpClient extends Work implements Sender { try { System.out.println("Client: Waiting for select... "); System.out.println("Client: Number of selected keys: " + selector.select()); + System.out.println("wakker"); //selector.select(); Set selectionKeySet = selector.selectedKeys(); Iterator selectionKeyIterator = selectionKeySet.iterator(); diff --git a/java/base/src/main/java/base/server/channel/TcpServer.java b/java/base/src/main/java/base/server/channel/TcpServer.java index a30331d..1fa46ef 100644 --- a/java/base/src/main/java/base/server/channel/TcpServer.java +++ b/java/base/src/main/java/base/server/channel/TcpServer.java @@ -16,6 +16,7 @@ import java.util.Set; import base.exception.worker.ActivateException; import base.exception.worker.DeactivateException; import base.sender.Sender; +import base.server.channel.TcpServerClient; import base.work.Work; public class TcpServer extends Work implements Sender { @@ -50,7 +51,6 @@ public class TcpServer extends Work implements Sender { } public void activate() throws ActivateException { - System.out.println("Server: Activate!"); try { // Get selector @@ -81,11 +81,15 @@ public class TcpServer extends Work implements Sender { serverSocket.close(); } catch (IOException e) { throw new DeactivateException(); + } finally { + for (TcpServerClient client : clientList) { + client.stop(); + } } } - public void exit() { - super.exit(); + public void stop() { + super.stop(); if (selector != null) { selector.wakeup(); } diff --git a/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java b/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java index 3fcaf26..37da5b1 100644 --- a/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java +++ b/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java @@ -4,14 +4,12 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; -import java.util.ArrayList; import base.exception.worker.ActivateException; -import base.exception.worker.DeactivateException; -import base.work.Listen; import base.work.Work; -public class UdpMulticastClient extends Work { +public abstract class UdpMulticastClient extends Work { + protected static final String HOST = "239.255.255.255"; protected static final int BUFFER_SIZE = 2048; protected String host; @@ -19,7 +17,10 @@ public class UdpMulticastClient extends Work { protected int bufferSize; protected MulticastSocket socket; protected InetAddress group; - protected ArrayList> listenList; + + public UdpMulticastClient(int port) { + this(HOST, port); + } public UdpMulticastClient(String host, int port) { this(host, port, BUFFER_SIZE); @@ -28,8 +29,7 @@ public class UdpMulticastClient extends Work { public UdpMulticastClient(String host, int port, int bufferSize) { this.host = host; this.port = port; - this.bufferSize = BUFFER_SIZE; - listenList = new ArrayList>(); + this.bufferSize = BUFFER_SIZE; } public void work() { @@ -38,9 +38,7 @@ public class UdpMulticastClient extends Work { DatagramPacket packet = new DatagramPacket(buffer, buffer.length); socket.receive(packet); buffer = packet.getData(); - for (Listen listen : listenList) { - listen.add(buffer); - } + input(buffer); } catch (IOException e) {} } @@ -50,24 +48,16 @@ public class UdpMulticastClient extends Work { group = InetAddress.getByName(host); socket.joinGroup(group); } catch (IOException e) { + logger.error("", e); throw new ActivateException(); } } - public void deactivate() throws DeactivateException { - try { - socket.leaveGroup(group); - } catch (IOException e) { - throw new DeactivateException(); - } + public void stop() { + System.out.println("client close socket"); socket.close(); + super.stop(); } - public void register(Listen listen) { - listenList.add(listen); - } - - public void remove(Listen listen) { - listenList.remove(listen); - } + protected abstract void input(byte[] buffer); } diff --git a/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java b/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java index 0962870..39cba2f 100644 --- a/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java +++ b/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java @@ -9,15 +9,23 @@ import base.exception.worker.ActivateException; import base.exception.worker.DeactivateException; import base.sender.Sender; import base.work.Listen; +import base.worker.Worker; public class UdpMulticastServer extends Listen implements Sender { + protected static final String HOST = "239.255.255.255"; protected static final int BUFFER_SIZE = 2048; protected String host; protected int port; protected MulticastSocket socket; + //private XX x; + + public UdpMulticastServer(int port) { + this(HOST, port); + } public UdpMulticastServer(String host, int port) { + super(Worker.Type.BACKGROUND); this.host = host; this.port = port; } @@ -28,7 +36,8 @@ public class UdpMulticastServer extends Listen implements Sender { // pass socket directly to Server to establish bidirectional // couple together capabilities // listen to datagrams and deal with writing using nio? - new XX(socket).start(); + //x = new XX(socket); + //x.start(); } catch (IOException e) { throw new ActivateException(); } @@ -36,18 +45,10 @@ public class UdpMulticastServer extends Listen implements Sender { } public void deactivate() throws DeactivateException { - super.deactivate(); - socket.close(); - } + System.err.println("lets work the magic"); - public boolean active() { - return socket != null; - /* Should handle connection state - if (socket == null) { - return false; - } else { - return socket.isConnected() && !socket.isClosed(); - }*/ + socket.close(); + super.deactivate(); } public void input(byte[] buffer) { diff --git a/java/base/src/main/java/base/server/datagram/UdpServer.java b/java/base/src/main/java/base/server/datagram/UdpServer.java index a39d531..f59ef77 100644 --- a/java/base/src/main/java/base/server/datagram/UdpServer.java +++ b/java/base/src/main/java/base/server/datagram/UdpServer.java @@ -4,7 +4,6 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; -import java.net.SocketTimeoutException; import base.exception.worker.ActivateException; import base.work.Work; @@ -12,6 +11,7 @@ import base.work.Work; public abstract class UdpServer extends Work { protected static final int BUFFER_SIZE = 1024; protected static final int TIMEOUT = 1000; + protected int port; protected int bufferSize; protected DatagramSocket diagramSocket; @@ -38,13 +38,20 @@ public abstract class UdpServer extends Work { } } + public void stop() { + super.stop(); + if (diagramSocket != null) { + diagramSocket.close(); + } + } + public void work() { byte[] buffer = new byte[bufferSize]; DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length); try { diagramSocket.receive(datagramPacket); - } catch (SocketTimeoutException e) { - return; + } catch (SocketException e) { + stop(); } catch (IOException e) { logger.error("Failed to receive packet", e); stop(); diff --git a/java/base/src/main/java/base/server/datagram/XX.java b/java/base/src/main/java/base/server/datagram/XX.java index a868995..8832363 100644 --- a/java/base/src/main/java/base/server/datagram/XX.java +++ b/java/base/src/main/java/base/server/datagram/XX.java @@ -4,31 +4,34 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.MulticastSocket; -public class XX extends Thread { +import base.work.Work; +public class XX extends Work { + + protected int bufferSize = 1024; private MulticastSocket socket; + private DatagramPacket dgram; public XX(MulticastSocket socket) { this.socket = socket; + byte[] b = new byte[1024]; + dgram = new DatagramPacket(b, b.length); } - public void run() { - while (true) { - byte[] b = new byte[1024]; - DatagramPacket dgram = new DatagramPacket(b, b.length); - - while(true) { - try { - socket.receive(dgram); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } // blocks until a datagram is received - System.err.println("Received " + dgram.getLength() + - " bytes from " + dgram.getAddress()); - dgram.setLength(b.length); // must reset length field! - } + public void work() { + try { + socket.receive(dgram); + } catch (IOException e) { + stop(); + } // blocks until a datagram is received + System.err.println("Received " + dgram.getLength() + + " bytes from " + dgram.getAddress()); + dgram.setLength(bufferSize); // must reset length field! } + + public void stop() { + super.stop(); + socket.close(); } } diff --git a/java/base/src/main/java/base/server/socket/TcpServer.java b/java/base/src/main/java/base/server/socket/TcpServer.java index d678497..d6cd36a 100644 --- a/java/base/src/main/java/base/server/socket/TcpServer.java +++ b/java/base/src/main/java/base/server/socket/TcpServer.java @@ -7,6 +7,7 @@ import java.net.Socket; import java.util.ArrayList; import base.exception.worker.ActivateException; +import base.exception.worker.DeactivateException; import base.sender.Sender; import base.work.Work; @@ -39,13 +40,19 @@ public class TcpServer extends Work implements Sender { logger.error("", e); throw new ActivateException(); } - super.activate(); + } + + public void deactivate() throws DeactivateException { + for (TcpServerClient client : clientList) { + client.stop(); + } } public void exit() { super.exit(); try { serverSocket.close(); + // Should check if clients exit as well } catch (IOException e) { logger.error("", e); } diff --git a/java/base/src/main/java/base/server/socket/TcpServerClient.java b/java/base/src/main/java/base/server/socket/TcpServerClient.java index 9b0534b..3843211 100644 --- a/java/base/src/main/java/base/server/socket/TcpServerClient.java +++ b/java/base/src/main/java/base/server/socket/TcpServerClient.java @@ -32,7 +32,6 @@ public class TcpServerClient extends AbstractTcpClient { } public void input(byte[] buffer) { -System.out.println(1234); server.input(this, buffer); } } diff --git a/java/base/src/main/java/base/work/Listen.java b/java/base/src/main/java/base/work/Listen.java index 7238409..a6692fb 100644 --- a/java/base/src/main/java/base/work/Listen.java +++ b/java/base/src/main/java/base/work/Listen.java @@ -71,16 +71,25 @@ public abstract class Listen extends Work implements Listener { } } + public void stop() { + super.stop(); + synchronized (this) { + notifyAll(); + } + } + public void work() { - System.err.println(this.getClass().getName()); while (!queue.isEmpty()) { + logger.debug("Listen: work() > input"); input(queue.poll()); } synchronized (this) { - try { + logger.debug("Listen: work() > wait"); + try { wait(); } catch (InterruptedException e) {} - } + logger.debug("Listen: work() > notified"); + } } public void input(Object object) { diff --git a/java/base/src/main/java/base/work/Work.java b/java/base/src/main/java/base/work/Work.java index 73d2e41..5e962a9 100644 --- a/java/base/src/main/java/base/work/Work.java +++ b/java/base/src/main/java/base/work/Work.java @@ -47,20 +47,22 @@ public abstract class Work { } public void start() { - logger.debug("Start work"); + logger.debug("Work: start()"); worker.start(); } public void stop() { - logger.debug("Stop work"); + logger.debug("Work: stop()"); worker.stop(); } public boolean active() { + logger.debug("Work: active()"); return worker.active(); } public void exit() { + logger.debug("Work: exit()"); worker.exit(); } diff --git a/java/base/src/main/java/base/worker/ThreadWorker.java b/java/base/src/main/java/base/worker/ThreadWorker.java index a045c99..aee836d 100644 --- a/java/base/src/main/java/base/worker/ThreadWorker.java +++ b/java/base/src/main/java/base/worker/ThreadWorker.java @@ -24,7 +24,7 @@ public class ThreadWorker extends Worker implements Runnable { run = true; if (thread) { logger.debug("Start thread"); - new Thread(this, getClass().getName()).start(); + new Thread(this, work.getClass().getName()).start(); } else { logger.debug("Run directly"); run(); @@ -38,13 +38,8 @@ public class ThreadWorker extends Worker implements Runnable { start(thread); } - public synchronized void stop() { - super.stop(); - notifyAll(); - } - public void exit() { + work.stop(); run = false; - stop(); } } diff --git a/java/base/src/main/java/base/worker/Worker.java b/java/base/src/main/java/base/worker/Worker.java index fc3242f..ad52e9b 100644 --- a/java/base/src/main/java/base/worker/Worker.java +++ b/java/base/src/main/java/base/worker/Worker.java @@ -14,7 +14,7 @@ public abstract class Worker { public static final int SLEEP = 100; - protected Logger logger = LoggerFactory.getLogger(getClass()); + protected Logger logger; protected boolean run = false; protected boolean active = false; @@ -25,22 +25,31 @@ public abstract class Worker { public Worker(Work work) { this.work = work; + logger = LoggerFactory.getLogger(work.getClass()); } public boolean active() { + logger.debug("Worker: active()"); + System.out.println(activate + " " + deactivate + " " + active + ": " + (deactivate || active)); return deactivate || active; } public final void run() { + logger.debug("Worker: run()"); while (run || deactivate) { + System.err.println("xxx"); runActivate(); + System.err.println("act"); runDeactivate(); + System.err.println("deact"); runWork(); + System.err.println("---" + getClass().getName() + run + " " + deactivate); } } - public void runActivate() { + public void runActivate() { if (activate && !active) { + logger.debug("Worker: runActivate()"); try { work.activate(); active = true; @@ -54,6 +63,7 @@ public abstract class Worker { public void runDeactivate() { if (deactivate && active) { + logger.debug("Worker: runDeactivate()"); try { work.deactivate(); } catch (DeactivateException e) { @@ -67,9 +77,11 @@ public abstract class Worker { public void runWork() { if (active) { + logger.debug("Worker: runWork() > work"); work.work(); } else if (run) { try { + logger.debug("Worker: runWork() > wait"); synchronized (this) { wait(); } @@ -96,10 +108,13 @@ public abstract class Worker { public abstract void start(); public void stop() { + logger.debug("Worker: stop()"); + logger.debug("Worker: stop() " + active + " " + activate); if (active && !activate) { deactivate = true; } activate = false; + logger.debug("Worker: stop() " + deactivate); } abstract public void exit(); diff --git a/java/base/src/main/java/test/server/TestUdpMulticastCommunication.java b/java/base/src/main/java/test/server/TestUdpMulticastCommunication.java deleted file mode 100644 index c1a26bf..0000000 --- a/java/base/src/main/java/test/server/TestUdpMulticastCommunication.java +++ /dev/null @@ -1,27 +0,0 @@ -package test.server; - -import test.server.dummy.DummyUdpListen; -import test.server.dummy.DummyWriter; -import base.server.datagram.UdpMulticastClient; -import base.server.datagram.UdpMulticastServer; - -public class TestUdpMulticastCommunication { - public static void main(String[] args) { - // Test Client (multicast) < Server - String host = "239.255.255.255"; - int port = 4446; - - UdpMulticastServer y = new UdpMulticastServer(host, port); - y.start(); - UdpMulticastClient x = new UdpMulticastClient(host, port); - x.start(); - DummyUdpListen z = new DummyUdpListen(); - x.register(z); - z.start(); - new DummyWriter(y).start(); - - try { - Thread.sleep(100000); - } catch (InterruptedException e) {} - } -} diff --git a/java/base/src/main/java/test/server/dummy/DummyUdpListen.java b/java/base/src/main/java/test/server/dummy/DummyUdpListen.java deleted file mode 100644 index 49d3b64..0000000 --- a/java/base/src/main/java/test/server/dummy/DummyUdpListen.java +++ /dev/null @@ -1,14 +0,0 @@ -package test.server.dummy; - -import base.work.Listen; - -public class DummyUdpListen extends Listen { - public DummyUdpListen() { - super(); - } - - public void input(byte[] buffer) { - String received = new String(buffer).trim(); - System.out.println("Quote of the Moment: " + received); - } -} diff --git a/java/base/src/main/java/test/server/dummy/DummyWriter.java b/java/base/src/main/java/test/server/dummy/DummyWriter.java deleted file mode 100644 index e8e91e4..0000000 --- a/java/base/src/main/java/test/server/dummy/DummyWriter.java +++ /dev/null @@ -1,32 +0,0 @@ -package test.server.dummy; - -import java.io.IOException; - -import base.sender.Sender; -import base.work.Work; - -public class DummyWriter extends Work implements Sender { - - private Sender sender; - - public DummyWriter(Sender sender) { - this.sender = sender; - } - - public void work() { - System.out.println("Client sending messages to server..."); - String [] messages = new String[] {"Time goes fast.", "What now?", "Bye."}; - try { - for (int i = 0; i < messages.length; i++) { - System.out.println(messages[i]); - send(new String(messages[i]).getBytes()); - sleep(200); - } - stop(); - } catch (Exception e) {} - } - - public void send(byte[] buffer) throws IOException { - sender.send(buffer); - } -} diff --git a/java/base/src/test/java/junit/AllTests.java b/java/base/src/test/java/junit/AllTests.java index c012bf1..3c105e3 100644 --- a/java/base/src/test/java/junit/AllTests.java +++ b/java/base/src/test/java/junit/AllTests.java @@ -8,7 +8,8 @@ import org.junit.runners.Suite.SuiteClasses; @SuiteClasses({ TestTcpSocketCommunication.class, TestTcpChannelCommunication.class, - TestUdpCommunication.class + TestUdpUnicastCommunication.class, + TestUdpMulticastCommunication.class }) public class AllTests {} diff --git a/java/base/src/test/java/junit/TestTcpSocketCommunication.java b/java/base/src/test/java/junit/TestTcpSocketCommunication.java index 8575dc7..879b45a 100644 --- a/java/base/src/test/java/junit/TestTcpSocketCommunication.java +++ b/java/base/src/test/java/junit/TestTcpSocketCommunication.java @@ -5,7 +5,6 @@ import static org.junit.Assert.assertNotNull; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import base.server.socket.TcpClient; diff --git a/java/base/src/test/java/junit/TestUdpMulticastCommunication.java b/java/base/src/test/java/junit/TestUdpMulticastCommunication.java new file mode 100644 index 0000000..fcee9e8 --- /dev/null +++ b/java/base/src/test/java/junit/TestUdpMulticastCommunication.java @@ -0,0 +1,65 @@ +package junit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import base.server.datagram.UdpMulticastClient; +import base.server.datagram.UdpMulticastServer; + +public class TestUdpMulticastCommunication { + protected UdpMulticastServer server; + protected TestUdpMulticastClient client; + + @Before + public void setUp() throws Exception { + server = new UdpMulticastServer(1234); + server.start(); + client = new TestUdpMulticastClient(1234); + client.start(); + } + + @After + public void tearDown() throws Exception { + client.exit(); + server.exit(); + + // Should add blocking stop and exit to worker + while (client.active() || server.active()) { + Thread.sleep(1000); + } + } + + @Test + public void testServerToClientCommunication() throws Exception { + String message = "test"; + server.send(message.getBytes()); + System.err.println("send"); + synchronized (client) { + client.wait(2000); + } + System.err.println("after wait"); + byte[] buffer = client.buffer; + assertNotNull("Received input", buffer); + assertEquals("Message intact", message, new String(buffer).trim()); + } + + class TestUdpMulticastClient extends UdpMulticastClient { + public byte[] buffer; + + public TestUdpMulticastClient(int port) { + super(port); + } + + protected void input(byte[] buffer) { + this.buffer = buffer; + System.err.println("binnen"); + synchronized (this) { + notifyAll(); + } + } + } +} diff --git a/java/base/src/test/java/junit/TestUdpCommunication.java b/java/base/src/test/java/junit/TestUdpUnicastCommunication.java similarity index 96% rename from java/base/src/test/java/junit/TestUdpCommunication.java rename to java/base/src/test/java/junit/TestUdpUnicastCommunication.java index 68c9503..b949b22 100644 --- a/java/base/src/test/java/junit/TestUdpCommunication.java +++ b/java/base/src/test/java/junit/TestUdpUnicastCommunication.java @@ -10,7 +10,7 @@ import org.junit.Test; import base.sender.UdpSender; import base.server.datagram.UdpServer; -public class TestUdpCommunication { +public class TestUdpUnicastCommunication { protected TestUdpServer server; protected UdpSender sender;