From 27750330124c91a9c7a581bb2f377020cae9af89 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Mon, 15 Jun 2015 22:56:05 +0100 Subject: [PATCH] Snapshot of fixed implementation on various components of client/server model --- .../main/java/base/loader/AbstractLoader.java | 9 +-- .../src/main/java/base/receiver/Receiver.java | 5 -- .../java/base/server/channel/TcpClient.java | 73 ++++++++++++----- .../java/base/server/channel/TcpServer.java | 69 +++++++++++----- .../base/server/channel/TcpServerClient.java | 25 +++--- .../server/datagram/UdpMulticastClient.java | 81 ++++++++++++++----- .../server/datagram/UdpMulticastServer.java | 79 ++++++++++++------ .../java/base/server/datagram/UdpServer.java | 25 +++--- .../main/java/base/server/datagram/XX.java | 34 ++++++++ .../base/server/socket/AbstractClient.java | 24 ------ .../base/server/socket/AbstractTcpClient.java | 75 +++++++++++++++++ .../java/base/server/socket/TcpClient.java | 68 +++------------- .../java/base/server/socket/TcpServer.java | 60 ++++++-------- .../base/server/socket/TcpServerClient.java | 38 +++++++++ 14 files changed, 433 insertions(+), 232 deletions(-) delete mode 100644 java/base/src/main/java/base/receiver/Receiver.java create mode 100644 java/base/src/main/java/base/server/datagram/XX.java delete mode 100644 java/base/src/main/java/base/server/socket/AbstractClient.java create mode 100644 java/base/src/main/java/base/server/socket/AbstractTcpClient.java create mode 100644 java/base/src/main/java/base/server/socket/TcpServerClient.java diff --git a/java/base/src/main/java/base/loader/AbstractLoader.java b/java/base/src/main/java/base/loader/AbstractLoader.java index 6304058..0f9581c 100644 --- a/java/base/src/main/java/base/loader/AbstractLoader.java +++ b/java/base/src/main/java/base/loader/AbstractLoader.java @@ -8,7 +8,6 @@ import org.picocontainer.MutablePicoContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class AbstractLoader { protected static final String PROPERTIES_FILE = "loader.properties"; protected Logger logger = LoggerFactory.getLogger(AbstractLoader.class); @@ -24,15 +23,13 @@ public class AbstractLoader { } public static AbstractLoader getLoader() throws IOException { - return getLoader(PROPERTIES_FILE); + return new AbstractLoader(readProperties(PROPERTIES_FILE)); } - public static AbstractLoader getLoader(String propertiesFile) throws IOException { + public static Properties readProperties(String propertiesFile) throws IOException { /* Read properties file */ Properties properties = new Properties(); properties.load(AbstractLoader.class.getClassLoader().getResourceAsStream(propertiesFile)); - - /* Initialise loader */ - return new AbstractLoader(properties); + return properties; } } diff --git a/java/base/src/main/java/base/receiver/Receiver.java b/java/base/src/main/java/base/receiver/Receiver.java deleted file mode 100644 index b6b4be0..0000000 --- a/java/base/src/main/java/base/receiver/Receiver.java +++ /dev/null @@ -1,5 +0,0 @@ -package base.receiver; - -public interface Receiver { - public void receive(byte[] buffer); -} diff --git a/java/base/src/main/java/base/server/channel/TcpClient.java b/java/base/src/main/java/base/server/channel/TcpClient.java index 2851e8e..43b9bba 100644 --- a/java/base/src/main/java/base/server/channel/TcpClient.java +++ b/java/base/src/main/java/base/server/channel/TcpClient.java @@ -11,12 +11,14 @@ import java.util.Iterator; import java.util.Set; import base.exception.worker.ActivateException; -import base.receiver.Receiver; +import base.exception.worker.DeactivateException; import base.sender.Sender; +import base.work.Listen; import base.work.Work; import base.worker.Worker; -public class TcpClient extends Work implements Sender { +public abstract class TcpClient extends Work implements Sender { + protected static final String HOST = "localhost"; protected static final int BUFFER_SIZE = 1024; protected String host; @@ -24,7 +26,11 @@ public class TcpClient extends Work implements Sender { protected int bufferSize; protected SocketChannel socketChannel; protected Selector selector; - protected ArrayList receiverList = new ArrayList(); + protected ArrayList> listenList = new ArrayList>(); + + public TcpClient(int port) { + this(HOST, port); + } public TcpClient(String host, int port) { this(host, port, BUFFER_SIZE); @@ -37,6 +43,7 @@ public class TcpClient extends Work implements Sender { } public void activate() throws ActivateException { + System.out.println("Client: Activate!"); try { InetSocketAddress hostAddress = new InetSocketAddress(host, port); socketChannel = SocketChannel.open(hostAddress); @@ -46,6 +53,9 @@ public class TcpClient extends Work implements Sender { } selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_READ); + synchronized (host) { + host.notifyAll(); + } } catch (Exception e) { logger.error("", e); throw new ActivateException(); @@ -53,29 +63,45 @@ public class TcpClient extends Work implements Sender { super.activate(); } + public void deactivate() throws DeactivateException { + System.out.println("Client: Deactivate!"); + try { + selector.close(); + socketChannel.close(); + } catch (IOException e) { + throw new DeactivateException(); + } + } + + public void exit() { + super.exit(); + if (selector != null) { + selector.wakeup(); + } + } + public final void work() { try { - //System.out.println("Client: Waiting for select..."); - //System.out.println("Client: Number of selected keys: " + selector.select()); - selector.select(); + System.out.println("Client: Waiting for select... "); + System.out.println("Client: Number of selected keys: " + selector.select()); + //selector.select(); Set selectionKeySet = selector.selectedKeys(); - Iterator selectionKeyIterator = selectionKeySet.iterator(); + Iterator selectionKeyIterator = selectionKeySet.iterator(); + while (selectionKeyIterator.hasNext()) { - SelectionKey selectionKey = selectionKeyIterator.next(); + SelectionKey selectionKey = selectionKeyIterator.next(); if (selectionKey.isReadable()) { ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); socketChannel.read(byteBuffer); byte[] buffer = byteBuffer.array(); - for (Receiver receiver : receiverList) { - receiver.receive(buffer); - } + input(buffer); } else if (selectionKey.isWritable()) { byte[] buffer; buffer = (byte[]) selectionKey.attachment(); - System.out.println("poll() " + new String(buffer).trim()); ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); socketChannel.write(byteBuffer); - selectionKey.cancel(); + //selectionKey.cancel(); + socketChannel.register(selector, SelectionKey.OP_READ); } selectionKeyIterator.remove(); } @@ -84,20 +110,29 @@ public class TcpClient extends Work implements Sender { } } + protected abstract void input(byte[] buffer); + public void send(byte[] buffer) throws IOException { + if (selector == null) { + try { + synchronized (host) { + host.wait(); + } + } catch (InterruptedException e) {} + } selector.wakeup(); - socketChannel.register(selector, SelectionKey.OP_WRITE, buffer); + socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } public void close() throws IOException { socketChannel.close(); } - public void register(Receiver receiver) { - receiverList.add(receiver); + /*public void register(Listen listen) { + listenList.add(listen); } - public void remove(Receiver receiver) { - receiverList.remove(receiver); - } + public void remove(Listen listen) { + listenList.remove(listen); + }*/ } \ No newline at end of file diff --git a/java/base/src/main/java/base/server/channel/TcpServer.java b/java/base/src/main/java/base/server/channel/TcpServer.java index 290b80a..a30331d 100644 --- a/java/base/src/main/java/base/server/channel/TcpServer.java +++ b/java/base/src/main/java/base/server/channel/TcpServer.java @@ -2,7 +2,7 @@ package base.server.channel; import java.io.IOException; import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; +import java.net.BindException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; @@ -14,10 +14,12 @@ import java.util.Iterator; import java.util.Set; import base.exception.worker.ActivateException; +import base.exception.worker.DeactivateException; import base.sender.Sender; import base.work.Work; public class TcpServer extends Work implements Sender { + protected static final Class CLIENT_CLASS = TcpServerClient.class; protected static final int BUFFER_SIZE = 1024; protected int port; @@ -28,7 +30,7 @@ public class TcpServer extends Work implements Sender { protected ArrayList clientList; public TcpServer(int port) { - this(port, TcpServerClient.class); + this(port, CLIENT_CLASS); } public TcpServer(int port, Class clientClass) { @@ -39,16 +41,17 @@ public class TcpServer extends Work implements Sender { this.port = port; this.bufferSize = bufferSize; try { - logger.error(clientClass.getName()); - clientConstructor = Class.forName(clientClass.getName()).getConstructor(getClass(), SocketChannel.class, Integer.class); + // Allow dependency injection, constructor arguments + clientConstructor = Class.forName(clientClass.getName()).getConstructor(TcpServer.class, SocketChannel.class, Integer.class); } catch (NoSuchMethodException | SecurityException | ClassNotFoundException e) { - logger.error("Failed to initialise client constructor"); - e.printStackTrace(); + logger.error("Failed to initialise client constructor", e); } clientList = new ArrayList(); } public void activate() throws ActivateException { + + System.out.println("Server: Activate!"); try { // Get selector selector = Selector.open(); @@ -59,13 +62,38 @@ public class TcpServer extends Work implements Sender { serverSocket.bind(hostAddress); serverSocket.configureBlocking(false); serverSocket.register(selector, SelectionKey.OP_ACCEPT); - } catch (Exception e) { - throw new ActivateException(); + synchronized (clientConstructor) { + clientConstructor.notifyAll(); + } + return; + } catch (BindException e) { + logger.error("Address already in use", e); + } catch (IOException e) { + logger.error("", e); + } + throw new ActivateException(); + } + + public void deactivate() throws DeactivateException { + System.out.println("Server: Deactivate!"); + try { + selector.close(); + serverSocket.close(); + } catch (IOException e) { + throw new DeactivateException(); + } + } + + public void exit() { + super.exit(); + if (selector != null) { + selector.wakeup(); } } public void work() { try { + System.out.println("Server: Waiting for select... "); System.out.println("Server: Number of selected keys: " + selector.select()); Set selectionKeySet = selector.selectedKeys(); @@ -77,12 +105,12 @@ public class TcpServer extends Work implements Sender { // Accept the new client connection SocketChannel socketChannel = serverSocket.accept(); socketChannel.configureBlocking(false); - + // Add the new connection to the selector - TcpServerClient serverClient = (TcpServerClient) clientConstructor.newInstance(this, socketChannel, bufferSize); - clientList.add(serverClient); - socketChannel.register(selector, SelectionKey.OP_READ, serverClient); - //initClient(serverClient); + TcpServerClient client = (TcpServerClient) clientConstructor.newInstance(this, socketChannel, bufferSize); + clientList.add(client); + socketChannel.register(selector, SelectionKey.OP_READ, client); + //initClient(client); System.out.println("Accepted new connection from client: " + socketChannel); } else if (selectionKey.isReadable()) { // Read the data from client @@ -93,7 +121,7 @@ public class TcpServer extends Work implements Sender { } selectionKeyIterator.remove(); } - } catch (IOException e) {} catch (InstantiationException e) { + }/* catch (IOException e) {} catch (InstantiationException e) { logger.error("", e); } catch (IllegalAccessException e) { logger.error("", e); @@ -101,21 +129,26 @@ public class TcpServer extends Work implements Sender { logger.error("", e); } catch (InvocationTargetException e) { logger.error("", e); + } */catch (Exception e) { + e.printStackTrace(); } } - protected void initClient(TcpServerClient serverClient) { - /*try { - serverClient.write(ByteBuffer.wrap(new String("Hi there!").getBytes())); + protected void initClient(TcpServerClient client) { + try { + client.write(ByteBuffer.wrap(new String("Hi there!").getBytes())); } catch (IOException e) { logger.error("", e); - }*/ + } } public void send(byte[] buffer) throws IOException { + logger.debug("Number of clients = " + clientList.size()); for (TcpServerClient client : clientList) { // Should be dealt with in clients own thread client.send(buffer); } } + + public void input(TcpServerClient client, byte[] buffer) {} } \ No newline at end of file diff --git a/java/base/src/main/java/base/server/channel/TcpServerClient.java b/java/base/src/main/java/base/server/channel/TcpServerClient.java index 04ffa46..b9e7d97 100644 --- a/java/base/src/main/java/base/server/channel/TcpServerClient.java +++ b/java/base/src/main/java/base/server/channel/TcpServerClient.java @@ -4,11 +4,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import base.receiver.Receiver; +import base.sender.Sender; +import base.work.Listen; -public class TcpServerClient implements Receiver { +public class TcpServerClient extends Listen implements Sender { protected static final int BUFFER_SIZE = 1024; - + protected TcpServer server; protected SocketChannel socketChannel; protected int bufferSize; @@ -19,6 +20,7 @@ public class TcpServerClient implements Receiver { } public TcpServerClient(TcpServer server, SocketChannel socketChannel, Integer bufferSize) { + super(); this.server = server; this.socketChannel = socketChannel; this.bufferSize = bufferSize; @@ -26,7 +28,7 @@ public class TcpServerClient implements Receiver { } public void write(ByteBuffer byteBuffer) throws IOException { - socketChannel.write(byteBuffer); + socketChannel.write(byteBuffer); } public void readable() throws IOException { @@ -34,7 +36,7 @@ public class TcpServerClient implements Receiver { while (( read = socketChannel.read(byteBuffer)) > 0) { //byteBuffer.flip(); byte[] buffer = byteBuffer.array(); - receive(buffer); + input(buffer); System.out.println("readable() " + new String(buffer).trim()); byteBuffer.clear(); byteBuffer.put(new byte[bufferSize]); @@ -45,17 +47,8 @@ public class TcpServerClient implements Receiver { } } - public void receive(byte[] buffer) { - // Should be forwarded somewhere? - String output = new String(buffer).trim(); - System.err.println("Message read from client: " + output); - if (output.equals("Bye.")) { - try { - socketChannel.close(); - } catch (IOException e) {} - System.out.println("Client messages are complete; close."); - } - + public void input(byte[] buffer) { + server.input(this, buffer); } public void send(byte[] buffer) throws IOException { diff --git a/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java b/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java index 9e1c379..3fcaf26 100644 --- a/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java +++ b/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java @@ -1,30 +1,73 @@ package base.server.datagram; +import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; +import java.util.ArrayList; -public class UdpMulticastClient implements Runnable { - public void run() { +import base.exception.worker.ActivateException; +import base.exception.worker.DeactivateException; +import base.work.Listen; +import base.work.Work; + +public class UdpMulticastClient extends Work { + protected static final int BUFFER_SIZE = 2048; + + protected String host; + protected int port; + protected int bufferSize; + protected MulticastSocket socket; + protected InetAddress group; + protected ArrayList> listenList; + + public UdpMulticastClient(String host, int port) { + this(host, port, BUFFER_SIZE); + } + + public UdpMulticastClient(String host, int port, int bufferSize) { + this.host = host; + this.port = port; + this.bufferSize = BUFFER_SIZE; + listenList = new ArrayList>(); + } + + public void work() { + try { + byte[] buffer = new byte[bufferSize]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + socket.receive(packet); + buffer = packet.getData(); + for (Listen listen : listenList) { + listen.add(buffer); + } + } catch (IOException e) {} + } + + public void activate() throws ActivateException { try { - MulticastSocket socket = new MulticastSocket(4446); - InetAddress group = InetAddress.getByName("239.255.255.255"); + socket = new MulticastSocket(port); + group = InetAddress.getByName(host); socket.joinGroup(group); - - DatagramPacket packet; - while (true) { - byte[] buf = new byte[256]; - packet = new DatagramPacket(buf, buf.length); - socket.receive(packet); - - String received = new String(packet.getData()).trim(); - System.out.println("Quote of the Moment: " + received); - } - - //socket.leaveGroup(group); - //socket.close(); - } catch (Exception e) { - e.printStackTrace(); + } catch (IOException e) { + throw new ActivateException(); } } + + public void deactivate() throws DeactivateException { + try { + socket.leaveGroup(group); + } catch (IOException e) { + throw new DeactivateException(); + } + socket.close(); + } + + public void register(Listen listen) { + listenList.add(listen); + } + + public void remove(Listen listen) { + listenList.remove(listen); + } } diff --git a/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java b/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java index 612cac1..0962870 100644 --- a/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java +++ b/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java @@ -5,35 +5,66 @@ import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; -public class UdpMulticastServer implements Runnable { - private MulticastSocket socket; +import base.exception.worker.ActivateException; +import base.exception.worker.DeactivateException; +import base.sender.Sender; +import base.work.Listen; - public void run() { +public class UdpMulticastServer extends Listen implements Sender { + protected static final int BUFFER_SIZE = 2048; + + protected String host; + protected int port; + protected MulticastSocket socket; + + public UdpMulticastServer(String host, int port) { + this.host = host; + this.port = port; + } + + public void activate() throws ActivateException { try { - socket = new MulticastSocket(4445); + socket = new MulticastSocket(); // optional, add port and receive as well!! + // pass socket directly to Server to establish bidirectional + // couple together capabilities + // listen to datagrams and deal with writing using nio? + new XX(socket).start(); } catch (IOException e) { - e.printStackTrace(); + throw new ActivateException(); } - while (true) { - try { - byte[] buf = new byte[256]; - String dString = String.valueOf(Math.random()); - buf = dString.getBytes(); + super.activate(); + } - InetAddress group = InetAddress.getByName("239.255.255.255"); - DatagramPacket packet; - packet = new DatagramPacket(buf, buf.length, group, 4446); - socket.send(packet); + public void deactivate() throws DeactivateException { + super.deactivate(); + socket.close(); + } - try { - Thread.sleep(1000); - } - catch (InterruptedException e) { } - } - catch (IOException e) { - e.printStackTrace(); - } - } - //socket.close(); + public boolean active() { + return socket != null; + /* Should handle connection state + if (socket == null) { + return false; + } else { + return socket.isConnected() && !socket.isClosed(); + }*/ + } + + public void input(byte[] buffer) { + if (socket == null) { + return; + } + try { + InetAddress group = InetAddress.getByName(host); + DatagramPacket packet = new DatagramPacket(buffer, buffer.length, group, port); + socket.send(packet); + } + catch (IOException e) { + logger.error("", e); + } + } + + public void send(byte[] buffer) throws IOException { + add(buffer); } } diff --git a/java/base/src/main/java/base/server/datagram/UdpServer.java b/java/base/src/main/java/base/server/datagram/UdpServer.java index 7d058dd..02f097c 100644 --- a/java/base/src/main/java/base/server/datagram/UdpServer.java +++ b/java/base/src/main/java/base/server/datagram/UdpServer.java @@ -8,7 +8,7 @@ import java.net.SocketTimeoutException; import java.util.ArrayList; import base.exception.worker.ActivateException; -import base.receiver.Receiver; +import base.work.Listen; import base.work.Work; public abstract class UdpServer extends Work { @@ -17,7 +17,7 @@ public abstract class UdpServer extends Work { protected int port; protected int bufferSize; protected DatagramSocket diagramSocket; - protected ArrayList receiverList = new ArrayList(); + protected ArrayList> listenList = new ArrayList>(); public UdpServer(int port) { this(port, BUFFER_SIZE); @@ -53,16 +53,19 @@ public abstract class UdpServer extends Work { stop(); return; } - for (Receiver receiver : receiverList) { - receiver.receive(buffer); - } - } - - public void addReceiver(Receiver receiver) { - receiverList.add(receiver); + listen(buffer); + /*for (Listen listen : listenList) { + listen.add(buffer); + }*/ } - public void removeReceiver(Receiver receiver) { - receiverList.remove(receiver); + protected abstract void listen(byte[] buffer); + + /*public void register(Listen listen) { + listenList.add(listen); } + + public void remove(Listen listen) { + listenList.remove(listen); + }*/ } diff --git a/java/base/src/main/java/base/server/datagram/XX.java b/java/base/src/main/java/base/server/datagram/XX.java new file mode 100644 index 0000000..a868995 --- /dev/null +++ b/java/base/src/main/java/base/server/datagram/XX.java @@ -0,0 +1,34 @@ +package base.server.datagram; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.MulticastSocket; + +public class XX extends Thread { + + private MulticastSocket socket; + + public XX(MulticastSocket socket) { + this.socket = socket; + } + + public void run() { + while (true) { + byte[] b = new byte[1024]; + DatagramPacket dgram = new DatagramPacket(b, b.length); + + + while(true) { + try { + socket.receive(dgram); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } // blocks until a datagram is received + System.err.println("Received " + dgram.getLength() + + " bytes from " + dgram.getAddress()); + dgram.setLength(b.length); // must reset length field! + } + } + } +} diff --git a/java/base/src/main/java/base/server/socket/AbstractClient.java b/java/base/src/main/java/base/server/socket/AbstractClient.java deleted file mode 100644 index 72802b3..0000000 --- a/java/base/src/main/java/base/server/socket/AbstractClient.java +++ /dev/null @@ -1,24 +0,0 @@ -package base.server.socket; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; - -import base.sender.Sender; -import base.work.Work; - -// Should be Listen, process writes in own thread -public abstract class AbstractClient extends Work implements Sender { - protected Socket socket; - protected InputStream inputStream; - protected OutputStream outputStream; - - public AbstractClient(Socket socket) { - this.socket = socket; - } - - public void send(byte[] buffer) throws IOException { - outputStream.write(buffer); - } -} diff --git a/java/base/src/main/java/base/server/socket/AbstractTcpClient.java b/java/base/src/main/java/base/server/socket/AbstractTcpClient.java new file mode 100644 index 0000000..a7cc936 --- /dev/null +++ b/java/base/src/main/java/base/server/socket/AbstractTcpClient.java @@ -0,0 +1,75 @@ +package base.server.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import base.exception.worker.DeactivateException; +import base.sender.Sender; +import base.work.Work; + +public abstract class AbstractTcpClient extends Work implements Sender { + protected static final int BUFFER_SIZE = 1024; + + protected Object object = new Object(); + protected int bufferSize; + protected Socket socket; + protected InputStream inputStream; + protected OutputStream outputStream; + + public AbstractTcpClient(Integer bufferSize) { + this.bufferSize = bufferSize; + } + + public boolean active() { + return super.active() && socket.isConnected(); + } + + public void deactivate() throws DeactivateException { + super.deactivate(); + try { + inputStream.close(); + outputStream.close(); + socket.close(); + } catch (IOException e) { + logger.error("", e); + } + } + + public void exit() { + super.exit(); + try { + socket.close(); + } catch (IOException e) { + logger.error("", e); + } + } + + public void work() { + byte[] buffer = new byte[bufferSize]; + try { + while (inputStream.read(buffer) > 0) { + input(buffer); + } + } catch (IOException e) { + stop(); + } + } + + protected abstract void input(byte[] buffer); + + public void send(byte[] buffer) throws IOException { + if (outputStream == null) { + try { + synchronized (object) { + object.wait(); + } + } catch (InterruptedException e) {} + } + System.out.println("Client writing: " + Charset.defaultCharset().decode(ByteBuffer.wrap(buffer)).toString()); + outputStream.write(buffer); + } +} diff --git a/java/base/src/main/java/base/server/socket/TcpClient.java b/java/base/src/main/java/base/server/socket/TcpClient.java index 7631519..6eea5c1 100644 --- a/java/base/src/main/java/base/server/socket/TcpClient.java +++ b/java/base/src/main/java/base/server/socket/TcpClient.java @@ -3,38 +3,37 @@ package base.server.socket; import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; import base.exception.worker.ActivateException; -import base.exception.worker.DeactivateException; -import base.receiver.Receiver; -public abstract class TcpClient extends AbstractClient { - protected static final int BUFFER = 2048; +public abstract class TcpClient extends AbstractTcpClient { + protected static final String HOST = "localhost"; + protected String host; protected int port; - protected int bufferSize; - protected ArrayList receiverList = new ArrayList(); + + public TcpClient(int port) { + this(HOST, port); + } public TcpClient(String host, int port) { - this(host, port, BUFFER); + this(host, port, BUFFER_SIZE); } public TcpClient(String host, int port, int bufferSize) { - super(null); + super(bufferSize); this.host = host; this.port = port; - this.bufferSize = bufferSize; } - public void activate() throws ActivateException { + public void activate() throws ActivateException { try { socket = new Socket(host, port); inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); - //send("Incoming client!".getBytes()); + synchronized (object) { + object.notifyAll(); + } } catch (UnknownHostException e) { logger.error("", e); throw new ActivateException(); @@ -44,45 +43,4 @@ public abstract class TcpClient extends AbstractClient { } super.activate(); } - - public synchronized boolean active() { - return super.active() && socket.isConnected(); - } - - public void deactivate() throws DeactivateException { - super.deactivate(); - try { - inputStream.close(); - outputStream.close(); - socket.close(); - } catch (IOException e) { - logger.error("", e); - } - } - - public final void work() { - byte[] buffer = new byte[bufferSize]; - try { - while (inputStream.read(buffer) > 0) { - for (Receiver receiver : receiverList) { - receiver.receive(buffer); - } - } - } catch (IOException e) { - stop(); - } - } - - public void send(byte[] buffer) throws IOException { - System.out.println("Client writing: " + Charset.defaultCharset().decode(ByteBuffer.wrap(buffer)).toString()); - outputStream.write(buffer); - } - - public void register(Receiver receiver) { - receiverList.add(receiver); - } - - public void remove(Receiver receiver) { - receiverList.remove(receiver); - } } \ No newline at end of file diff --git a/java/base/src/main/java/base/server/socket/TcpServer.java b/java/base/src/main/java/base/server/socket/TcpServer.java index d750ca4..d678497 100644 --- a/java/base/src/main/java/base/server/socket/TcpServer.java +++ b/java/base/src/main/java/base/server/socket/TcpServer.java @@ -7,26 +7,29 @@ import java.net.Socket; import java.util.ArrayList; import base.exception.worker.ActivateException; -import base.receiver.Receiver; import base.sender.Sender; import base.work.Work; public class TcpServer extends Work implements Sender { + protected static final Class CLIENT_CLASS = TcpServerClient.class; + protected int port; - protected Socket socket; - protected Constructor clientConstructor; - protected ArrayList clientList; protected ServerSocket serverSocket; - protected ArrayList receiverList = new ArrayList(); + protected Constructor clientConstructor; + protected ArrayList clientList; + + public TcpServer(int port) { + this(port, CLIENT_CLASS); + } public TcpServer(int port, Class clientClass) { this.port = port; try { - clientConstructor = Class.forName(clientClass.getName()).getConstructor(Socket.class); + clientConstructor = Class.forName(clientClass.getName()).getConstructor(TcpServer.class, Socket.class); } catch (NoSuchMethodException | SecurityException | ClassNotFoundException e) { logger.error("Failed to initialise client constructor"); } - clientList = new ArrayList(); + clientList = new ArrayList(); } public void activate() throws ActivateException { @@ -39,49 +42,36 @@ public class TcpServer extends Work implements Sender { super.activate(); } - public void work() { + public void exit() { + super.exit(); try { - socket = serverSocket.accept(); + serverSocket.close(); } catch (IOException e) { logger.error("", e); - return; } + } + + public void work() { try { - Client client = (Client) clientConstructor.newInstance(socket); + Socket socket = serverSocket.accept(); + TcpServerClient client = (TcpServerClient) clientConstructor.newInstance(this, socket); clientList.add(client); client.start(); + System.out.println("Accepted new connection from client: " + socket); + } catch (IOException e) { + stop(); } catch (Exception e) { logger.error("", e); } } - public static abstract class Client extends AbstractClient { - - public Client(Socket socket) { - super(socket); - } - - public void setSocket(Socket socket) { - } - - public void work() { - } - - //public send(byte[] ) - } - - public void addReceiver(Receiver receiver) { - receiverList.add(receiver); - } - - public void removeReceiver(Receiver receiver) { - receiverList.remove(receiver); - } - public void send(byte[] buffer) throws IOException { - for (Client client : clientList) { + logger.debug("Number of clients = " + clientList.size()); + for (TcpServerClient client : clientList) { // Should be dealt with in clients own thread client.send(buffer); } } + + public void input(TcpServerClient client, byte[] buffer) {} } diff --git a/java/base/src/main/java/base/server/socket/TcpServerClient.java b/java/base/src/main/java/base/server/socket/TcpServerClient.java new file mode 100644 index 0000000..9b0534b --- /dev/null +++ b/java/base/src/main/java/base/server/socket/TcpServerClient.java @@ -0,0 +1,38 @@ +package base.server.socket; + +import java.io.IOException; +import java.net.Socket; + +import base.exception.worker.ActivateException; + +public class TcpServerClient extends AbstractTcpClient { + private TcpServer server; + + public TcpServerClient(TcpServer server, Socket socket) { + this(server, socket, BUFFER_SIZE); + } + + public TcpServerClient(TcpServer server, Socket socket, Integer bufferSize) { + super(bufferSize); + this.server = server; + this.socket = socket; + } + + public void activate() throws ActivateException { + try { + inputStream = socket.getInputStream(); + outputStream = socket.getOutputStream(); + synchronized (object) { + object.notifyAll(); + } + } catch (IOException e) { + logger.error("", e); + throw new ActivateException(); + } + } + + public void input(byte[] buffer) { +System.out.println(1234); + server.input(this, buffer); + } +}