Snapshot of fixed implementation on various components of client/server model

This commit is contained in:
2015-06-15 22:56:05 +01:00
parent fadde43e64
commit 2775033012
14 changed files with 433 additions and 232 deletions

View File

@@ -8,7 +8,6 @@ import org.picocontainer.MutablePicoContainer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class AbstractLoader { public class AbstractLoader {
protected static final String PROPERTIES_FILE = "loader.properties"; protected static final String PROPERTIES_FILE = "loader.properties";
protected Logger logger = LoggerFactory.getLogger(AbstractLoader.class); protected Logger logger = LoggerFactory.getLogger(AbstractLoader.class);
@@ -24,15 +23,13 @@ public class AbstractLoader {
} }
public static AbstractLoader getLoader() throws IOException { public static AbstractLoader getLoader() throws IOException {
return getLoader(PROPERTIES_FILE); return new AbstractLoader(readProperties(PROPERTIES_FILE));
} }
public static AbstractLoader getLoader(String propertiesFile) throws IOException { public static Properties readProperties(String propertiesFile) throws IOException {
/* Read properties file */ /* Read properties file */
Properties properties = new Properties(); Properties properties = new Properties();
properties.load(AbstractLoader.class.getClassLoader().getResourceAsStream(propertiesFile)); properties.load(AbstractLoader.class.getClassLoader().getResourceAsStream(propertiesFile));
return properties;
/* Initialise loader */
return new AbstractLoader(properties);
} }
} }

View File

@@ -1,5 +0,0 @@
package base.receiver;
public interface Receiver {
public void receive(byte[] buffer);
}

View File

@@ -11,12 +11,14 @@ import java.util.Iterator;
import java.util.Set; import java.util.Set;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.receiver.Receiver; import base.exception.worker.DeactivateException;
import base.sender.Sender; import base.sender.Sender;
import base.work.Listen;
import base.work.Work; import base.work.Work;
import base.worker.Worker; import base.worker.Worker;
public class TcpClient extends Work implements Sender { public abstract class TcpClient extends Work implements Sender {
protected static final String HOST = "localhost";
protected static final int BUFFER_SIZE = 1024; protected static final int BUFFER_SIZE = 1024;
protected String host; protected String host;
@@ -24,7 +26,11 @@ public class TcpClient extends Work implements Sender {
protected int bufferSize; protected int bufferSize;
protected SocketChannel socketChannel; protected SocketChannel socketChannel;
protected Selector selector; protected Selector selector;
protected ArrayList<Receiver> receiverList = new ArrayList<Receiver>(); protected ArrayList<Listen<byte[]>> listenList = new ArrayList<Listen<byte[]>>();
public TcpClient(int port) {
this(HOST, port);
}
public TcpClient(String host, int port) { public TcpClient(String host, int port) {
this(host, port, BUFFER_SIZE); this(host, port, BUFFER_SIZE);
@@ -37,6 +43,7 @@ public class TcpClient extends Work implements Sender {
} }
public void activate() throws ActivateException { public void activate() throws ActivateException {
System.out.println("Client: Activate!");
try { try {
InetSocketAddress hostAddress = new InetSocketAddress(host, port); InetSocketAddress hostAddress = new InetSocketAddress(host, port);
socketChannel = SocketChannel.open(hostAddress); socketChannel = SocketChannel.open(hostAddress);
@@ -46,6 +53,9 @@ public class TcpClient extends Work implements Sender {
} }
selector = Selector.open(); selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ); socketChannel.register(selector, SelectionKey.OP_READ);
synchronized (host) {
host.notifyAll();
}
} catch (Exception e) { } catch (Exception e) {
logger.error("", e); logger.error("", e);
throw new ActivateException(); throw new ActivateException();
@@ -53,29 +63,45 @@ public class TcpClient extends Work implements Sender {
super.activate(); super.activate();
} }
public void deactivate() throws DeactivateException {
System.out.println("Client: Deactivate!");
try {
selector.close();
socketChannel.close();
} catch (IOException e) {
throw new DeactivateException();
}
}
public void exit() {
super.exit();
if (selector != null) {
selector.wakeup();
}
}
public final void work() { public final void work() {
try { try {
//System.out.println("Client: Waiting for select..."); System.out.println("Client: Waiting for select... ");
//System.out.println("Client: Number of selected keys: " + selector.select()); System.out.println("Client: Number of selected keys: " + selector.select());
selector.select(); //selector.select();
Set<SelectionKey> selectionKeySet = selector.selectedKeys(); Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = selectionKeySet.iterator(); Iterator<SelectionKey> selectionKeyIterator = selectionKeySet.iterator();
while (selectionKeyIterator.hasNext()) { while (selectionKeyIterator.hasNext()) {
SelectionKey selectionKey = selectionKeyIterator.next(); SelectionKey selectionKey = selectionKeyIterator.next();
if (selectionKey.isReadable()) { if (selectionKey.isReadable()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
socketChannel.read(byteBuffer); socketChannel.read(byteBuffer);
byte[] buffer = byteBuffer.array(); byte[] buffer = byteBuffer.array();
for (Receiver receiver : receiverList) { input(buffer);
receiver.receive(buffer);
}
} else if (selectionKey.isWritable()) { } else if (selectionKey.isWritable()) {
byte[] buffer; byte[] buffer;
buffer = (byte[]) selectionKey.attachment(); buffer = (byte[]) selectionKey.attachment();
System.out.println("poll() " + new String(buffer).trim());
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
socketChannel.write(byteBuffer); socketChannel.write(byteBuffer);
selectionKey.cancel(); //selectionKey.cancel();
socketChannel.register(selector, SelectionKey.OP_READ);
} }
selectionKeyIterator.remove(); selectionKeyIterator.remove();
} }
@@ -84,20 +110,29 @@ public class TcpClient extends Work implements Sender {
} }
} }
protected abstract void input(byte[] buffer);
public void send(byte[] buffer) throws IOException { public void send(byte[] buffer) throws IOException {
if (selector == null) {
try {
synchronized (host) {
host.wait();
}
} catch (InterruptedException e) {}
}
selector.wakeup(); selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_WRITE, buffer); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
} }
public void close() throws IOException { public void close() throws IOException {
socketChannel.close(); socketChannel.close();
} }
public void register(Receiver receiver) { /*public void register(Listen<byte[]> listen) {
receiverList.add(receiver); listenList.add(listen);
} }
public void remove(Receiver receiver) { public void remove(Listen<byte[]> listen) {
receiverList.remove(receiver); listenList.remove(listen);
} }*/
} }

View File

@@ -2,7 +2,7 @@ package base.server.channel;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.net.BindException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
@@ -14,10 +14,12 @@ import java.util.Iterator;
import java.util.Set; import java.util.Set;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
import base.sender.Sender; import base.sender.Sender;
import base.work.Work; import base.work.Work;
public class TcpServer extends Work implements Sender { public class TcpServer extends Work implements Sender {
protected static final Class<?> CLIENT_CLASS = TcpServerClient.class;
protected static final int BUFFER_SIZE = 1024; protected static final int BUFFER_SIZE = 1024;
protected int port; protected int port;
@@ -28,7 +30,7 @@ public class TcpServer extends Work implements Sender {
protected ArrayList<TcpServerClient> clientList; protected ArrayList<TcpServerClient> clientList;
public TcpServer(int port) { public TcpServer(int port) {
this(port, TcpServerClient.class); this(port, CLIENT_CLASS);
} }
public TcpServer(int port, Class<?> clientClass) { public TcpServer(int port, Class<?> clientClass) {
@@ -39,16 +41,17 @@ public class TcpServer extends Work implements Sender {
this.port = port; this.port = port;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
try { try {
logger.error(clientClass.getName()); // Allow dependency injection, constructor arguments
clientConstructor = Class.forName(clientClass.getName()).getConstructor(getClass(), SocketChannel.class, Integer.class); clientConstructor = Class.forName(clientClass.getName()).getConstructor(TcpServer.class, SocketChannel.class, Integer.class);
} catch (NoSuchMethodException | SecurityException | ClassNotFoundException e) { } catch (NoSuchMethodException | SecurityException | ClassNotFoundException e) {
logger.error("Failed to initialise client constructor"); logger.error("Failed to initialise client constructor", e);
e.printStackTrace();
} }
clientList = new ArrayList<TcpServerClient>(); clientList = new ArrayList<TcpServerClient>();
} }
public void activate() throws ActivateException { public void activate() throws ActivateException {
System.out.println("Server: Activate!");
try { try {
// Get selector // Get selector
selector = Selector.open(); selector = Selector.open();
@@ -59,13 +62,38 @@ public class TcpServer extends Work implements Sender {
serverSocket.bind(hostAddress); serverSocket.bind(hostAddress);
serverSocket.configureBlocking(false); serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT); serverSocket.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) { synchronized (clientConstructor) {
throw new ActivateException(); clientConstructor.notifyAll();
}
return;
} catch (BindException e) {
logger.error("Address already in use", e);
} catch (IOException e) {
logger.error("", e);
}
throw new ActivateException();
}
public void deactivate() throws DeactivateException {
System.out.println("Server: Deactivate!");
try {
selector.close();
serverSocket.close();
} catch (IOException e) {
throw new DeactivateException();
}
}
public void exit() {
super.exit();
if (selector != null) {
selector.wakeup();
} }
} }
public void work() { public void work() {
try { try {
System.out.println("Server: Waiting for select... ");
System.out.println("Server: Number of selected keys: " + selector.select()); System.out.println("Server: Number of selected keys: " + selector.select());
Set<SelectionKey> selectionKeySet = selector.selectedKeys(); Set<SelectionKey> selectionKeySet = selector.selectedKeys();
@@ -77,12 +105,12 @@ public class TcpServer extends Work implements Sender {
// Accept the new client connection // Accept the new client connection
SocketChannel socketChannel = serverSocket.accept(); SocketChannel socketChannel = serverSocket.accept();
socketChannel.configureBlocking(false); socketChannel.configureBlocking(false);
// Add the new connection to the selector // Add the new connection to the selector
TcpServerClient serverClient = (TcpServerClient) clientConstructor.newInstance(this, socketChannel, bufferSize); TcpServerClient client = (TcpServerClient) clientConstructor.newInstance(this, socketChannel, bufferSize);
clientList.add(serverClient); clientList.add(client);
socketChannel.register(selector, SelectionKey.OP_READ, serverClient); socketChannel.register(selector, SelectionKey.OP_READ, client);
//initClient(serverClient); //initClient(client);
System.out.println("Accepted new connection from client: " + socketChannel); System.out.println("Accepted new connection from client: " + socketChannel);
} else if (selectionKey.isReadable()) { } else if (selectionKey.isReadable()) {
// Read the data from client // Read the data from client
@@ -93,7 +121,7 @@ public class TcpServer extends Work implements Sender {
} }
selectionKeyIterator.remove(); selectionKeyIterator.remove();
} }
} catch (IOException e) {} catch (InstantiationException e) { }/* catch (IOException e) {} catch (InstantiationException e) {
logger.error("", e); logger.error("", e);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
logger.error("", e); logger.error("", e);
@@ -101,21 +129,26 @@ public class TcpServer extends Work implements Sender {
logger.error("", e); logger.error("", e);
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
logger.error("", e); logger.error("", e);
} */catch (Exception e) {
e.printStackTrace();
} }
} }
protected void initClient(TcpServerClient serverClient) { protected void initClient(TcpServerClient client) {
/*try { try {
serverClient.write(ByteBuffer.wrap(new String("Hi there!").getBytes())); client.write(ByteBuffer.wrap(new String("Hi there!").getBytes()));
} catch (IOException e) { } catch (IOException e) {
logger.error("", e); logger.error("", e);
}*/ }
} }
public void send(byte[] buffer) throws IOException { public void send(byte[] buffer) throws IOException {
logger.debug("Number of clients = " + clientList.size());
for (TcpServerClient client : clientList) { for (TcpServerClient client : clientList) {
// Should be dealt with in clients own thread // Should be dealt with in clients own thread
client.send(buffer); client.send(buffer);
} }
} }
public void input(TcpServerClient client, byte[] buffer) {}
} }

View File

@@ -4,11 +4,12 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import base.receiver.Receiver; import base.sender.Sender;
import base.work.Listen;
public class TcpServerClient implements Receiver { public class TcpServerClient extends Listen<byte[]> implements Sender {
protected static final int BUFFER_SIZE = 1024; protected static final int BUFFER_SIZE = 1024;
protected TcpServer server; protected TcpServer server;
protected SocketChannel socketChannel; protected SocketChannel socketChannel;
protected int bufferSize; protected int bufferSize;
@@ -19,6 +20,7 @@ public class TcpServerClient implements Receiver {
} }
public TcpServerClient(TcpServer server, SocketChannel socketChannel, Integer bufferSize) { public TcpServerClient(TcpServer server, SocketChannel socketChannel, Integer bufferSize) {
super();
this.server = server; this.server = server;
this.socketChannel = socketChannel; this.socketChannel = socketChannel;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
@@ -26,7 +28,7 @@ public class TcpServerClient implements Receiver {
} }
public void write(ByteBuffer byteBuffer) throws IOException { public void write(ByteBuffer byteBuffer) throws IOException {
socketChannel.write(byteBuffer); socketChannel.write(byteBuffer);
} }
public void readable() throws IOException { public void readable() throws IOException {
@@ -34,7 +36,7 @@ public class TcpServerClient implements Receiver {
while (( read = socketChannel.read(byteBuffer)) > 0) { while (( read = socketChannel.read(byteBuffer)) > 0) {
//byteBuffer.flip(); //byteBuffer.flip();
byte[] buffer = byteBuffer.array(); byte[] buffer = byteBuffer.array();
receive(buffer); input(buffer);
System.out.println("readable() " + new String(buffer).trim()); System.out.println("readable() " + new String(buffer).trim());
byteBuffer.clear(); byteBuffer.clear();
byteBuffer.put(new byte[bufferSize]); byteBuffer.put(new byte[bufferSize]);
@@ -45,17 +47,8 @@ public class TcpServerClient implements Receiver {
} }
} }
public void receive(byte[] buffer) { public void input(byte[] buffer) {
// Should be forwarded somewhere? server.input(this, buffer);
String output = new String(buffer).trim();
System.err.println("Message read from client: " + output);
if (output.equals("Bye.")) {
try {
socketChannel.close();
} catch (IOException e) {}
System.out.println("Client messages are complete; close.");
}
} }
public void send(byte[] buffer) throws IOException { public void send(byte[] buffer) throws IOException {

View File

@@ -1,30 +1,73 @@
package base.server.datagram; package base.server.datagram;
import java.io.IOException;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.MulticastSocket; import java.net.MulticastSocket;
import java.util.ArrayList;
public class UdpMulticastClient implements Runnable { import base.exception.worker.ActivateException;
public void run() { import base.exception.worker.DeactivateException;
import base.work.Listen;
import base.work.Work;
public class UdpMulticastClient extends Work {
protected static final int BUFFER_SIZE = 2048;
protected String host;
protected int port;
protected int bufferSize;
protected MulticastSocket socket;
protected InetAddress group;
protected ArrayList<Listen<byte[]>> listenList;
public UdpMulticastClient(String host, int port) {
this(host, port, BUFFER_SIZE);
}
public UdpMulticastClient(String host, int port, int bufferSize) {
this.host = host;
this.port = port;
this.bufferSize = BUFFER_SIZE;
listenList = new ArrayList<Listen<byte[]>>();
}
public void work() {
try {
byte[] buffer = new byte[bufferSize];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
buffer = packet.getData();
for (Listen<byte[]> listen : listenList) {
listen.add(buffer);
}
} catch (IOException e) {}
}
public void activate() throws ActivateException {
try { try {
MulticastSocket socket = new MulticastSocket(4446); socket = new MulticastSocket(port);
InetAddress group = InetAddress.getByName("239.255.255.255"); group = InetAddress.getByName(host);
socket.joinGroup(group); socket.joinGroup(group);
} catch (IOException e) {
DatagramPacket packet; throw new ActivateException();
while (true) {
byte[] buf = new byte[256];
packet = new DatagramPacket(buf, buf.length);
socket.receive(packet);
String received = new String(packet.getData()).trim();
System.out.println("Quote of the Moment: " + received);
}
//socket.leaveGroup(group);
//socket.close();
} catch (Exception e) {
e.printStackTrace();
} }
} }
public void deactivate() throws DeactivateException {
try {
socket.leaveGroup(group);
} catch (IOException e) {
throw new DeactivateException();
}
socket.close();
}
public void register(Listen<byte[]> listen) {
listenList.add(listen);
}
public void remove(Listen<byte[]> listen) {
listenList.remove(listen);
}
} }

View File

@@ -5,35 +5,66 @@ import java.net.DatagramPacket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.MulticastSocket; import java.net.MulticastSocket;
public class UdpMulticastServer implements Runnable { import base.exception.worker.ActivateException;
private MulticastSocket socket; import base.exception.worker.DeactivateException;
import base.sender.Sender;
import base.work.Listen;
public void run() { public class UdpMulticastServer extends Listen<byte[]> implements Sender {
protected static final int BUFFER_SIZE = 2048;
protected String host;
protected int port;
protected MulticastSocket socket;
public UdpMulticastServer(String host, int port) {
this.host = host;
this.port = port;
}
public void activate() throws ActivateException {
try { try {
socket = new MulticastSocket(4445); socket = new MulticastSocket(); // optional, add port and receive as well!!
// pass socket directly to Server to establish bidirectional
// couple together capabilities
// listen to datagrams and deal with writing using nio?
new XX(socket).start();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); throw new ActivateException();
} }
while (true) { super.activate();
try { }
byte[] buf = new byte[256];
String dString = String.valueOf(Math.random());
buf = dString.getBytes();
InetAddress group = InetAddress.getByName("239.255.255.255"); public void deactivate() throws DeactivateException {
DatagramPacket packet; super.deactivate();
packet = new DatagramPacket(buf, buf.length, group, 4446); socket.close();
socket.send(packet); }
try { public boolean active() {
Thread.sleep(1000); return socket != null;
} /* Should handle connection state
catch (InterruptedException e) { } if (socket == null) {
} return false;
catch (IOException e) { } else {
e.printStackTrace(); return socket.isConnected() && !socket.isClosed();
} }*/
} }
//socket.close();
public void input(byte[] buffer) {
if (socket == null) {
return;
}
try {
InetAddress group = InetAddress.getByName(host);
DatagramPacket packet = new DatagramPacket(buffer, buffer.length, group, port);
socket.send(packet);
}
catch (IOException e) {
logger.error("", e);
}
}
public void send(byte[] buffer) throws IOException {
add(buffer);
} }
} }

View File

@@ -8,7 +8,7 @@ import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.receiver.Receiver; import base.work.Listen;
import base.work.Work; import base.work.Work;
public abstract class UdpServer extends Work { public abstract class UdpServer extends Work {
@@ -17,7 +17,7 @@ public abstract class UdpServer extends Work {
protected int port; protected int port;
protected int bufferSize; protected int bufferSize;
protected DatagramSocket diagramSocket; protected DatagramSocket diagramSocket;
protected ArrayList<Receiver> receiverList = new ArrayList<Receiver>(); protected ArrayList<Listen<byte[]>> listenList = new ArrayList<Listen<byte[]>>();
public UdpServer(int port) { public UdpServer(int port) {
this(port, BUFFER_SIZE); this(port, BUFFER_SIZE);
@@ -53,16 +53,19 @@ public abstract class UdpServer extends Work {
stop(); stop();
return; return;
} }
for (Receiver receiver : receiverList) { listen(buffer);
receiver.receive(buffer); /*for (Listen<byte[]> listen : listenList) {
} listen.add(buffer);
} }*/
public void addReceiver(Receiver receiver) {
receiverList.add(receiver);
} }
public void removeReceiver(Receiver receiver) { protected abstract void listen(byte[] buffer);
receiverList.remove(receiver);
/*public void register(Listen<byte[]> listen) {
listenList.add(listen);
} }
public void remove(Listen<Object> listen) {
listenList.remove(listen);
}*/
} }

View File

@@ -0,0 +1,34 @@
package base.server.datagram;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.MulticastSocket;
public class XX extends Thread {
private MulticastSocket socket;
public XX(MulticastSocket socket) {
this.socket = socket;
}
public void run() {
while (true) {
byte[] b = new byte[1024];
DatagramPacket dgram = new DatagramPacket(b, b.length);
while(true) {
try {
socket.receive(dgram);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} // blocks until a datagram is received
System.err.println("Received " + dgram.getLength() +
" bytes from " + dgram.getAddress());
dgram.setLength(b.length); // must reset length field!
}
}
}
}

View File

@@ -1,24 +0,0 @@
package base.server.socket;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import base.sender.Sender;
import base.work.Work;
// Should be Listen, process writes in own thread
public abstract class AbstractClient extends Work implements Sender {
protected Socket socket;
protected InputStream inputStream;
protected OutputStream outputStream;
public AbstractClient(Socket socket) {
this.socket = socket;
}
public void send(byte[] buffer) throws IOException {
outputStream.write(buffer);
}
}

View File

@@ -0,0 +1,75 @@
package base.server.socket;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import base.exception.worker.DeactivateException;
import base.sender.Sender;
import base.work.Work;
public abstract class AbstractTcpClient extends Work implements Sender {
protected static final int BUFFER_SIZE = 1024;
protected Object object = new Object();
protected int bufferSize;
protected Socket socket;
protected InputStream inputStream;
protected OutputStream outputStream;
public AbstractTcpClient(Integer bufferSize) {
this.bufferSize = bufferSize;
}
public boolean active() {
return super.active() && socket.isConnected();
}
public void deactivate() throws DeactivateException {
super.deactivate();
try {
inputStream.close();
outputStream.close();
socket.close();
} catch (IOException e) {
logger.error("", e);
}
}
public void exit() {
super.exit();
try {
socket.close();
} catch (IOException e) {
logger.error("", e);
}
}
public void work() {
byte[] buffer = new byte[bufferSize];
try {
while (inputStream.read(buffer) > 0) {
input(buffer);
}
} catch (IOException e) {
stop();
}
}
protected abstract void input(byte[] buffer);
public void send(byte[] buffer) throws IOException {
if (outputStream == null) {
try {
synchronized (object) {
object.wait();
}
} catch (InterruptedException e) {}
}
System.out.println("Client writing: " + Charset.defaultCharset().decode(ByteBuffer.wrap(buffer)).toString());
outputStream.write(buffer);
}
}

View File

@@ -3,38 +3,37 @@ package base.server.socket;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
import base.receiver.Receiver;
public abstract class TcpClient extends AbstractClient { public abstract class TcpClient extends AbstractTcpClient {
protected static final int BUFFER = 2048; protected static final String HOST = "localhost";
protected String host; protected String host;
protected int port; protected int port;
protected int bufferSize;
protected ArrayList<Receiver> receiverList = new ArrayList<Receiver>(); public TcpClient(int port) {
this(HOST, port);
}
public TcpClient(String host, int port) { public TcpClient(String host, int port) {
this(host, port, BUFFER); this(host, port, BUFFER_SIZE);
} }
public TcpClient(String host, int port, int bufferSize) { public TcpClient(String host, int port, int bufferSize) {
super(null); super(bufferSize);
this.host = host; this.host = host;
this.port = port; this.port = port;
this.bufferSize = bufferSize;
} }
public void activate() throws ActivateException { public void activate() throws ActivateException {
try { try {
socket = new Socket(host, port); socket = new Socket(host, port);
inputStream = socket.getInputStream(); inputStream = socket.getInputStream();
outputStream = socket.getOutputStream(); outputStream = socket.getOutputStream();
//send("Incoming client!".getBytes()); synchronized (object) {
object.notifyAll();
}
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
logger.error("", e); logger.error("", e);
throw new ActivateException(); throw new ActivateException();
@@ -44,45 +43,4 @@ public abstract class TcpClient extends AbstractClient {
} }
super.activate(); super.activate();
} }
public synchronized boolean active() {
return super.active() && socket.isConnected();
}
public void deactivate() throws DeactivateException {
super.deactivate();
try {
inputStream.close();
outputStream.close();
socket.close();
} catch (IOException e) {
logger.error("", e);
}
}
public final void work() {
byte[] buffer = new byte[bufferSize];
try {
while (inputStream.read(buffer) > 0) {
for (Receiver receiver : receiverList) {
receiver.receive(buffer);
}
}
} catch (IOException e) {
stop();
}
}
public void send(byte[] buffer) throws IOException {
System.out.println("Client writing: " + Charset.defaultCharset().decode(ByteBuffer.wrap(buffer)).toString());
outputStream.write(buffer);
}
public void register(Receiver receiver) {
receiverList.add(receiver);
}
public void remove(Receiver receiver) {
receiverList.remove(receiver);
}
} }

View File

@@ -7,26 +7,29 @@ import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.receiver.Receiver;
import base.sender.Sender; import base.sender.Sender;
import base.work.Work; import base.work.Work;
public class TcpServer extends Work implements Sender { public class TcpServer extends Work implements Sender {
protected static final Class<?> CLIENT_CLASS = TcpServerClient.class;
protected int port; protected int port;
protected Socket socket;
protected Constructor<?> clientConstructor;
protected ArrayList<Client> clientList;
protected ServerSocket serverSocket; protected ServerSocket serverSocket;
protected ArrayList<Receiver> receiverList = new ArrayList<Receiver>(); protected Constructor<?> clientConstructor;
protected ArrayList<TcpServerClient> clientList;
public TcpServer(int port) {
this(port, CLIENT_CLASS);
}
public TcpServer(int port, Class<?> clientClass) { public TcpServer(int port, Class<?> clientClass) {
this.port = port; this.port = port;
try { try {
clientConstructor = Class.forName(clientClass.getName()).getConstructor(Socket.class); clientConstructor = Class.forName(clientClass.getName()).getConstructor(TcpServer.class, Socket.class);
} catch (NoSuchMethodException | SecurityException | ClassNotFoundException e) { } catch (NoSuchMethodException | SecurityException | ClassNotFoundException e) {
logger.error("Failed to initialise client constructor"); logger.error("Failed to initialise client constructor");
} }
clientList = new ArrayList<Client>(); clientList = new ArrayList<TcpServerClient>();
} }
public void activate() throws ActivateException { public void activate() throws ActivateException {
@@ -39,49 +42,36 @@ public class TcpServer extends Work implements Sender {
super.activate(); super.activate();
} }
public void work() { public void exit() {
super.exit();
try { try {
socket = serverSocket.accept(); serverSocket.close();
} catch (IOException e) { } catch (IOException e) {
logger.error("", e); logger.error("", e);
return;
} }
}
public void work() {
try { try {
Client client = (Client) clientConstructor.newInstance(socket); Socket socket = serverSocket.accept();
TcpServerClient client = (TcpServerClient) clientConstructor.newInstance(this, socket);
clientList.add(client); clientList.add(client);
client.start(); client.start();
System.out.println("Accepted new connection from client: " + socket);
} catch (IOException e) {
stop();
} catch (Exception e) { } catch (Exception e) {
logger.error("", e); logger.error("", e);
} }
} }
public static abstract class Client extends AbstractClient {
public Client(Socket socket) {
super(socket);
}
public void setSocket(Socket socket) {
}
public void work() {
}
//public send(byte[] )
}
public void addReceiver(Receiver receiver) {
receiverList.add(receiver);
}
public void removeReceiver(Receiver receiver) {
receiverList.remove(receiver);
}
public void send(byte[] buffer) throws IOException { public void send(byte[] buffer) throws IOException {
for (Client client : clientList) { logger.debug("Number of clients = " + clientList.size());
for (TcpServerClient client : clientList) {
// Should be dealt with in clients own thread // Should be dealt with in clients own thread
client.send(buffer); client.send(buffer);
} }
} }
public void input(TcpServerClient client, byte[] buffer) {}
} }

View File

@@ -0,0 +1,38 @@
package base.server.socket;
import java.io.IOException;
import java.net.Socket;
import base.exception.worker.ActivateException;
public class TcpServerClient extends AbstractTcpClient {
private TcpServer server;
public TcpServerClient(TcpServer server, Socket socket) {
this(server, socket, BUFFER_SIZE);
}
public TcpServerClient(TcpServer server, Socket socket, Integer bufferSize) {
super(bufferSize);
this.server = server;
this.socket = socket;
}
public void activate() throws ActivateException {
try {
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
synchronized (object) {
object.notifyAll();
}
} catch (IOException e) {
logger.error("", e);
throw new ActivateException();
}
}
public void input(byte[] buffer) {
System.out.println(1234);
server.input(this, buffer);
}
}