diff --git a/java/base/src/main/java/base/server/AbstractClient.java b/java/base/src/main/java/base/server/AbstractClient.java index 41215be..610549a 100644 --- a/java/base/src/main/java/base/server/AbstractClient.java +++ b/java/base/src/main/java/base/server/AbstractClient.java @@ -4,13 +4,13 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; -import base.worker.ThreadWorker; +import base.work.Work; -public abstract class AbstractClient extends ThreadWorker { +public abstract class AbstractClient extends Work { protected Socket socket; protected InputStream inputStream; protected OutputStream outputStream; - + public AbstractClient(Socket socket) { this.socket = socket; } diff --git a/java/base/src/main/java/base/server/TcpClient.java b/java/base/src/main/java/base/server/TcpClient.java index 41468e1..8cedcfc 100644 --- a/java/base/src/main/java/base/server/TcpClient.java +++ b/java/base/src/main/java/base/server/TcpClient.java @@ -3,26 +3,29 @@ package base.server; import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; import base.exception.worker.ActivateException; import base.exception.worker.DeactivateException; public abstract class TcpClient extends AbstractClient { - protected String host; + protected static final int BUFFER = 2048; + protected String host; protected int port; - public TcpClient(String ip, int port) { + public TcpClient(String host, int port) { super(null); - this.host = ip; + this.host = host; this.port = port; } public void activate() throws ActivateException { try { socket = new Socket(host, port); - socket.setSoTimeout(SLEEP); inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); + send("Incoming client!".getBytes()); } catch (UnknownHostException e) { logger.error("", e); throw new ActivateException(); @@ -34,10 +37,7 @@ public abstract class TcpClient extends AbstractClient { } public synchronized boolean active() { - if (active && !socket.isConnected()) { - active = false; - } - return active; + return super.active() && socket.isConnected(); } public void deactivate() throws DeactivateException { @@ -49,5 +49,23 @@ public abstract class TcpClient extends AbstractClient { } catch (IOException e) { logger.error("", e); } - } + } + + public final void work() { + byte[] buffer = new byte[BUFFER]; + try { + while (inputStream.read(buffer) > 0) { + receive(buffer); + } + } catch (IOException e) { + stop(); + } + } + + public void send(byte[] buffer) throws IOException { + System.out.println("Client writing: " + Charset.defaultCharset().decode(ByteBuffer.wrap(buffer)).toString()); + outputStream.write(buffer); + } + + public abstract void receive(byte[] buffer); } \ No newline at end of file diff --git a/java/base/src/main/java/base/server/TcpServer.java b/java/base/src/main/java/base/server/TcpServer.java index c98649d..57cc741 100644 --- a/java/base/src/main/java/base/server/TcpServer.java +++ b/java/base/src/main/java/base/server/TcpServer.java @@ -7,9 +7,9 @@ import java.net.Socket; import java.util.ArrayList; import base.exception.worker.ActivateException; -import base.worker.Worker; +import base.work.Work; -public class TcpServer extends Worker { +public class TcpServer extends Work { protected int port; protected Socket socket; protected Constructor clientConstructor; @@ -26,7 +26,7 @@ public class TcpServer extends Worker { clientList = new ArrayList(); } - protected void activate() throws ActivateException { + public void activate() throws ActivateException { try { serverSocket = new ServerSocket(port); } catch (IOException e) { diff --git a/java/base/src/main/java/base/server/UdpServer.java b/java/base/src/main/java/base/server/UdpServer.java index ac226de..ef5a6be 100644 --- a/java/base/src/main/java/base/server/UdpServer.java +++ b/java/base/src/main/java/base/server/UdpServer.java @@ -4,12 +4,14 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; +import java.net.SocketTimeoutException; import base.exception.worker.ActivateException; -import base.worker.Worker; +import base.work.Work; -public abstract class UdpServer extends Worker { +public abstract class UdpServer extends Work { protected static final int BUFFER_SIZE = 1024; + protected static final int TIMEOUT = 1000; protected int port; protected int bufferSize; protected DatagramSocket diagramSocket; @@ -19,14 +21,16 @@ public abstract class UdpServer extends Worker { } public UdpServer(int port, int bufferSize) { + super(); this.port = port; this.bufferSize = bufferSize; } - protected void activate() throws ActivateException { + public void activate() throws ActivateException { try { logger.debug("Starting datagram socket on port " + port); diagramSocket = new DatagramSocket(port); + diagramSocket.setSoTimeout(TIMEOUT); super.activate(); } catch (SocketException e) { logger.error("Failed to initialize socket", e); @@ -35,21 +39,19 @@ public abstract class UdpServer extends Worker { } public void work() { - byte[] buffer = new byte[bufferSize]; + byte[] buffer = new byte[bufferSize]; DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length); try { diagramSocket.receive(datagramPacket); + } catch (SocketTimeoutException e) { + return; } catch (IOException e) { - logger.error("Failed to receive packet"); + logger.error("Failed to receive packet", e); stop(); return; } receive(buffer); } - public synchronized void stop() { - diagramSocket.close(); - } - abstract protected void receive(byte[] buffer); } diff --git a/java/base/src/main/java/base/work/Listen.java b/java/base/src/main/java/base/work/Listen.java index c7c10bf..7b97fd9 100644 --- a/java/base/src/main/java/base/work/Listen.java +++ b/java/base/src/main/java/base/work/Listen.java @@ -32,7 +32,7 @@ public abstract class Listen extends Work { } protected Listen(Worker worker) { - this.work = worker; + this.worker = worker; queue = new ConcurrentLinkedQueue(); } diff --git a/java/base/src/main/java/base/work/Work.java b/java/base/src/main/java/base/work/Work.java index d371adb..a801792 100644 --- a/java/base/src/main/java/base/work/Work.java +++ b/java/base/src/main/java/base/work/Work.java @@ -1,5 +1,8 @@ package base.work; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import base.exception.worker.ActivateException; import base.exception.worker.DeactivateException; import base.worker.DirectWorker; @@ -10,8 +13,10 @@ import base.worker.pool.WorkerPool; public abstract class Work { protected static final Worker.Type WORKER_TYPE = Worker.Type.THREAD; + + protected Logger logger = LoggerFactory.getLogger(getClass()); - protected Worker work; + protected Worker worker; protected Work() { this(WORKER_TYPE); @@ -20,34 +25,43 @@ public abstract class Work { protected Work(Worker.Type workerType) { switch (workerType) { case DIRECT: - work = new DirectWorker(this); + worker = new DirectWorker(this); break; default: case THREAD: - work = new ThreadWorker(this); + worker = new ThreadWorker(this); break; } } protected Work(Worker worker) { - this.work = worker; + this.worker = worker; } protected Work(WorkerPool workerPool) { - work = new PooledWorker(this); - workerPool.add((PooledWorker) work); + worker = new PooledWorker(this); + workerPool.add((PooledWorker) worker); } protected void sleep(int time) { - work.sleep(time); + worker.sleep(time); } public void start() { - work.start(); + worker.start(); } public void stop() { - work.stop(); + logger.debug("Stop work"); + worker.stop(); + } + + public boolean active() { + return worker.active(); + } + + public void exit() { + worker.exit(); } public void activate() throws ActivateException {} diff --git a/java/base/src/main/java/base/worker/IntervalWork.java b/java/base/src/main/java/base/worker/IntervalWork.java index 9f5f697..bc2b855 100644 --- a/java/base/src/main/java/base/worker/IntervalWork.java +++ b/java/base/src/main/java/base/worker/IntervalWork.java @@ -14,11 +14,11 @@ public abstract class IntervalWork extends Work { protected IntervalWork(Worker.Type workerType) { switch (workerType) { case DIRECT: - work = new DirectIntervalWorker(this); + worker = new DirectIntervalWorker(this); break; default: case THREAD: - work = new ThreadIntervalWorker(this); + worker = new ThreadIntervalWorker(this); break; } } @@ -26,11 +26,11 @@ public abstract class IntervalWork extends Work { protected IntervalWork(Worker.Type workerType, int interval) { switch (workerType) { case DIRECT: - work = new DirectIntervalWorker(this, interval); + worker = new DirectIntervalWorker(this, interval); break; default: case THREAD: - work = new ThreadIntervalWorker(this, interval); + worker = new ThreadIntervalWorker(this, interval); break; } } diff --git a/java/base/src/main/java/base/worker/ThreadWorker.java b/java/base/src/main/java/base/worker/ThreadWorker.java index 1ebfb2e..b00bb77 100644 --- a/java/base/src/main/java/base/worker/ThreadWorker.java +++ b/java/base/src/main/java/base/worker/ThreadWorker.java @@ -39,6 +39,7 @@ public class ThreadWorker extends Worker implements Runnable { } public synchronized void stop() { + super.stop(); if (active) { deactivate = true; } @@ -46,7 +47,7 @@ public class ThreadWorker extends Worker implements Runnable { } public void exit() { - stop(); run = false; + stop(); } } diff --git a/java/base/src/main/java/base/worker/Worker.java b/java/base/src/main/java/base/worker/Worker.java index 2df4424..be10d92 100644 --- a/java/base/src/main/java/base/worker/Worker.java +++ b/java/base/src/main/java/base/worker/Worker.java @@ -12,8 +12,8 @@ public abstract class Worker { DIRECT, THREAD, POOLED } - protected static final int SLEEP = 100; - + public static final int SLEEP = 100; + protected Logger logger = LoggerFactory.getLogger(getClass()); protected boolean run = false; @@ -27,10 +27,6 @@ public abstract class Worker { this.work = work; } - public abstract void start(); - - public abstract void stop(); - public boolean active() { return active; } @@ -96,4 +92,13 @@ public abstract class Worker { logger.info("", e); } } + + public abstract void start(); + + public void stop() { + logger.debug("Stop worker"); + activate = false; + } + + abstract public void exit(); } diff --git a/java/base/src/main/java/base/worker/pool/PooledWorker.java b/java/base/src/main/java/base/worker/pool/PooledWorker.java index 13afee3..e68cc8a 100644 --- a/java/base/src/main/java/base/worker/pool/PooledWorker.java +++ b/java/base/src/main/java/base/worker/pool/PooledWorker.java @@ -42,4 +42,8 @@ public class PooledWorker extends Worker { activateQueue.remove(this); deactivateQueue.add(this); } + + public void exit() { + stop(); + } }