Initial test implementation of UDP/TCP communication using simple sockets and socket channels, includes required changes to worker models

This commit is contained in:
2015-05-15 20:18:16 +01:00
parent d79f6f4d21
commit c92a6dcb6f
10 changed files with 89 additions and 45 deletions

View File

@@ -4,9 +4,9 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import base.worker.ThreadWorker; import base.work.Work;
public abstract class AbstractClient extends ThreadWorker { public abstract class AbstractClient extends Work {
protected Socket socket; protected Socket socket;
protected InputStream inputStream; protected InputStream inputStream;
protected OutputStream outputStream; protected OutputStream outputStream;

View File

@@ -3,26 +3,29 @@ package base.server;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
public abstract class TcpClient extends AbstractClient { public abstract class TcpClient extends AbstractClient {
protected static final int BUFFER = 2048;
protected String host; protected String host;
protected int port; protected int port;
public TcpClient(String ip, int port) { public TcpClient(String host, int port) {
super(null); super(null);
this.host = ip; this.host = host;
this.port = port; this.port = port;
} }
public void activate() throws ActivateException { public void activate() throws ActivateException {
try { try {
socket = new Socket(host, port); socket = new Socket(host, port);
socket.setSoTimeout(SLEEP);
inputStream = socket.getInputStream(); inputStream = socket.getInputStream();
outputStream = socket.getOutputStream(); outputStream = socket.getOutputStream();
send("Incoming client!".getBytes());
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
logger.error("", e); logger.error("", e);
throw new ActivateException(); throw new ActivateException();
@@ -34,10 +37,7 @@ public abstract class TcpClient extends AbstractClient {
} }
public synchronized boolean active() { public synchronized boolean active() {
if (active && !socket.isConnected()) { return super.active() && socket.isConnected();
active = false;
}
return active;
} }
public void deactivate() throws DeactivateException { public void deactivate() throws DeactivateException {
@@ -50,4 +50,22 @@ public abstract class TcpClient extends AbstractClient {
logger.error("", e); logger.error("", e);
} }
} }
public final void work() {
byte[] buffer = new byte[BUFFER];
try {
while (inputStream.read(buffer) > 0) {
receive(buffer);
}
} catch (IOException e) {
stop();
}
}
public void send(byte[] buffer) throws IOException {
System.out.println("Client writing: " + Charset.defaultCharset().decode(ByteBuffer.wrap(buffer)).toString());
outputStream.write(buffer);
}
public abstract void receive(byte[] buffer);
} }

View File

@@ -7,9 +7,9 @@ import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.worker.Worker; import base.work.Work;
public class TcpServer extends Worker { public class TcpServer extends Work {
protected int port; protected int port;
protected Socket socket; protected Socket socket;
protected Constructor<?> clientConstructor; protected Constructor<?> clientConstructor;
@@ -26,7 +26,7 @@ public class TcpServer extends Worker {
clientList = new ArrayList<Client>(); clientList = new ArrayList<Client>();
} }
protected void activate() throws ActivateException { public void activate() throws ActivateException {
try { try {
serverSocket = new ServerSocket(port); serverSocket = new ServerSocket(port);
} catch (IOException e) { } catch (IOException e) {

View File

@@ -4,12 +4,14 @@ import java.io.IOException;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.worker.Worker; import base.work.Work;
public abstract class UdpServer extends Worker { public abstract class UdpServer extends Work {
protected static final int BUFFER_SIZE = 1024; protected static final int BUFFER_SIZE = 1024;
protected static final int TIMEOUT = 1000;
protected int port; protected int port;
protected int bufferSize; protected int bufferSize;
protected DatagramSocket diagramSocket; protected DatagramSocket diagramSocket;
@@ -19,14 +21,16 @@ public abstract class UdpServer extends Worker {
} }
public UdpServer(int port, int bufferSize) { public UdpServer(int port, int bufferSize) {
super();
this.port = port; this.port = port;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
} }
protected void activate() throws ActivateException { public void activate() throws ActivateException {
try { try {
logger.debug("Starting datagram socket on port " + port); logger.debug("Starting datagram socket on port " + port);
diagramSocket = new DatagramSocket(port); diagramSocket = new DatagramSocket(port);
diagramSocket.setSoTimeout(TIMEOUT);
super.activate(); super.activate();
} catch (SocketException e) { } catch (SocketException e) {
logger.error("Failed to initialize socket", e); logger.error("Failed to initialize socket", e);
@@ -39,17 +43,15 @@ public abstract class UdpServer extends Worker {
DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length); DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
try { try {
diagramSocket.receive(datagramPacket); diagramSocket.receive(datagramPacket);
} catch (SocketTimeoutException e) {
return;
} catch (IOException e) { } catch (IOException e) {
logger.error("Failed to receive packet"); logger.error("Failed to receive packet", e);
stop(); stop();
return; return;
} }
receive(buffer); receive(buffer);
} }
public synchronized void stop() {
diagramSocket.close();
}
abstract protected void receive(byte[] buffer); abstract protected void receive(byte[] buffer);
} }

View File

@@ -32,7 +32,7 @@ public abstract class Listen<E> extends Work {
} }
protected Listen(Worker worker) { protected Listen(Worker worker) {
this.work = worker; this.worker = worker;
queue = new ConcurrentLinkedQueue<E>(); queue = new ConcurrentLinkedQueue<E>();
} }

View File

@@ -1,5 +1,8 @@
package base.work; package base.work;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.worker.DirectWorker; import base.worker.DirectWorker;
@@ -11,7 +14,9 @@ import base.worker.pool.WorkerPool;
public abstract class Work { public abstract class Work {
protected static final Worker.Type WORKER_TYPE = Worker.Type.THREAD; protected static final Worker.Type WORKER_TYPE = Worker.Type.THREAD;
protected Worker work; protected Logger logger = LoggerFactory.getLogger(getClass());
protected Worker worker;
protected Work() { protected Work() {
this(WORKER_TYPE); this(WORKER_TYPE);
@@ -20,34 +25,43 @@ public abstract class Work {
protected Work(Worker.Type workerType) { protected Work(Worker.Type workerType) {
switch (workerType) { switch (workerType) {
case DIRECT: case DIRECT:
work = new DirectWorker(this); worker = new DirectWorker(this);
break; break;
default: default:
case THREAD: case THREAD:
work = new ThreadWorker(this); worker = new ThreadWorker(this);
break; break;
} }
} }
protected Work(Worker worker) { protected Work(Worker worker) {
this.work = worker; this.worker = worker;
} }
protected Work(WorkerPool workerPool) { protected Work(WorkerPool workerPool) {
work = new PooledWorker(this); worker = new PooledWorker(this);
workerPool.add((PooledWorker) work); workerPool.add((PooledWorker) worker);
} }
protected void sleep(int time) { protected void sleep(int time) {
work.sleep(time); worker.sleep(time);
} }
public void start() { public void start() {
work.start(); worker.start();
} }
public void stop() { public void stop() {
work.stop(); logger.debug("Stop work");
worker.stop();
}
public boolean active() {
return worker.active();
}
public void exit() {
worker.exit();
} }
public void activate() throws ActivateException {} public void activate() throws ActivateException {}

View File

@@ -14,11 +14,11 @@ public abstract class IntervalWork extends Work {
protected IntervalWork(Worker.Type workerType) { protected IntervalWork(Worker.Type workerType) {
switch (workerType) { switch (workerType) {
case DIRECT: case DIRECT:
work = new DirectIntervalWorker(this); worker = new DirectIntervalWorker(this);
break; break;
default: default:
case THREAD: case THREAD:
work = new ThreadIntervalWorker(this); worker = new ThreadIntervalWorker(this);
break; break;
} }
} }
@@ -26,11 +26,11 @@ public abstract class IntervalWork extends Work {
protected IntervalWork(Worker.Type workerType, int interval) { protected IntervalWork(Worker.Type workerType, int interval) {
switch (workerType) { switch (workerType) {
case DIRECT: case DIRECT:
work = new DirectIntervalWorker(this, interval); worker = new DirectIntervalWorker(this, interval);
break; break;
default: default:
case THREAD: case THREAD:
work = new ThreadIntervalWorker(this, interval); worker = new ThreadIntervalWorker(this, interval);
break; break;
} }
} }

View File

@@ -39,6 +39,7 @@ public class ThreadWorker extends Worker implements Runnable {
} }
public synchronized void stop() { public synchronized void stop() {
super.stop();
if (active) { if (active) {
deactivate = true; deactivate = true;
} }
@@ -46,7 +47,7 @@ public class ThreadWorker extends Worker implements Runnable {
} }
public void exit() { public void exit() {
stop();
run = false; run = false;
stop();
} }
} }

View File

@@ -12,7 +12,7 @@ public abstract class Worker {
DIRECT, THREAD, POOLED DIRECT, THREAD, POOLED
} }
protected static final int SLEEP = 100; public static final int SLEEP = 100;
protected Logger logger = LoggerFactory.getLogger(getClass()); protected Logger logger = LoggerFactory.getLogger(getClass());
@@ -27,10 +27,6 @@ public abstract class Worker {
this.work = work; this.work = work;
} }
public abstract void start();
public abstract void stop();
public boolean active() { public boolean active() {
return active; return active;
} }
@@ -96,4 +92,13 @@ public abstract class Worker {
logger.info("", e); logger.info("", e);
} }
} }
public abstract void start();
public void stop() {
logger.debug("Stop worker");
activate = false;
}
abstract public void exit();
} }

View File

@@ -42,4 +42,8 @@ public class PooledWorker extends Worker {
activateQueue.remove(this); activateQueue.remove(this);
deactivateQueue.add(this); deactivateQueue.add(this);
} }
public void exit() {
stop();
}
} }