From 9a349206791795ebc65fa663a467be0b208b5b9a Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Thu, 11 Jun 2015 22:42:14 +0100 Subject: [PATCH] Snapshot of implementation of general tcp/udp server/client/sender/receiver framework --- .../src/main/java/base/receiver/Receiver.java | 5 + .../src/main/java/base/sender/Sender.java | 7 + .../src/main/java/base/sender/TcpSender.java | 19 +++ .../src/main/java/base/sender/UdpSender.java | 53 ++++++++ .../java/base/server/channel/TcpClient.java | 103 +++++++++++++++ .../java/base/server/channel/TcpServer.java | 121 ++++++++++++++++++ .../base/server/channel/TcpServerClient.java | 64 +++++++++ .../server/datagram/UdpMulticastClient.java | 30 +++++ .../server/datagram/UdpMulticastServer.java | 39 ++++++ .../base/server/socket/AbstractClient.java | 24 ++++ 10 files changed, 465 insertions(+) create mode 100644 java/base/src/main/java/base/receiver/Receiver.java create mode 100644 java/base/src/main/java/base/sender/Sender.java create mode 100644 java/base/src/main/java/base/sender/TcpSender.java create mode 100644 java/base/src/main/java/base/sender/UdpSender.java create mode 100644 java/base/src/main/java/base/server/channel/TcpClient.java create mode 100644 java/base/src/main/java/base/server/channel/TcpServer.java create mode 100644 java/base/src/main/java/base/server/channel/TcpServerClient.java create mode 100644 java/base/src/main/java/base/server/datagram/UdpMulticastClient.java create mode 100644 java/base/src/main/java/base/server/datagram/UdpMulticastServer.java create mode 100644 java/base/src/main/java/base/server/socket/AbstractClient.java diff --git a/java/base/src/main/java/base/receiver/Receiver.java b/java/base/src/main/java/base/receiver/Receiver.java new file mode 100644 index 0000000..b6b4be0 --- /dev/null +++ b/java/base/src/main/java/base/receiver/Receiver.java @@ -0,0 +1,5 @@ +package base.receiver; + +public interface Receiver { + public void receive(byte[] buffer); +} diff --git a/java/base/src/main/java/base/sender/Sender.java b/java/base/src/main/java/base/sender/Sender.java new file mode 100644 index 0000000..a92a541 --- /dev/null +++ b/java/base/src/main/java/base/sender/Sender.java @@ -0,0 +1,7 @@ +package base.sender; + +import java.io.IOException; + +public interface Sender { + public void send(byte[] buffer) throws IOException; +} diff --git a/java/base/src/main/java/base/sender/TcpSender.java b/java/base/src/main/java/base/sender/TcpSender.java new file mode 100644 index 0000000..9338f98 --- /dev/null +++ b/java/base/src/main/java/base/sender/TcpSender.java @@ -0,0 +1,19 @@ +package base.sender; + +import java.io.IOException; + +import base.server.socket.TcpClient; + +public abstract class TcpSender extends TcpClient implements Sender { + public TcpSender(String host, int port) { + super(host, port); + } + + public void send(byte[] buffer) throws IOException { + if (!active()) { + start(); + // Control over threads here? + } + outputStream.write(buffer); + } +} diff --git a/java/base/src/main/java/base/sender/UdpSender.java b/java/base/src/main/java/base/sender/UdpSender.java new file mode 100644 index 0000000..a0b34ed --- /dev/null +++ b/java/base/src/main/java/base/sender/UdpSender.java @@ -0,0 +1,53 @@ +package base.sender; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UdpSender implements Sender { + protected Logger logger = LoggerFactory.getLogger(getClass()); + + protected DatagramSocket datagramSocket; + protected InetAddress inetAddress; + protected int port; + + public UdpSender(String host, int port) throws UnknownHostException{ + inetAddress = InetAddress.getByName(host); + logger.debug(host); + logger.debug(String.valueOf(port)); + this.port = port; + } + + protected boolean setup() { + if (datagramSocket == null) { + try { + datagramSocket = new DatagramSocket(); + } catch (SocketException e) { + logger.error("Failed to create socket", e); + return false; + } + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + datagramSocket.close(); + } + }); + } + return true; + } + + public void send(byte[] buffer) { + try { + setup(); + DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length, inetAddress, port); + datagramSocket.send(datagramPacket); + } catch (IOException e) { + logger.error("Failed to send buffer", e); + } + } +} diff --git a/java/base/src/main/java/base/server/channel/TcpClient.java b/java/base/src/main/java/base/server/channel/TcpClient.java new file mode 100644 index 0000000..2851e8e --- /dev/null +++ b/java/base/src/main/java/base/server/channel/TcpClient.java @@ -0,0 +1,103 @@ +package base.server.channel; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; + +import base.exception.worker.ActivateException; +import base.receiver.Receiver; +import base.sender.Sender; +import base.work.Work; +import base.worker.Worker; + +public class TcpClient extends Work implements Sender { + protected static final int BUFFER_SIZE = 1024; + + protected String host; + protected int port; + protected int bufferSize; + protected SocketChannel socketChannel; + protected Selector selector; + protected ArrayList receiverList = new ArrayList(); + + public TcpClient(String host, int port) { + this(host, port, BUFFER_SIZE); + } + + public TcpClient(String host, int port, int bufferSize) { + this.host = host; + this.port = port; + this.bufferSize = bufferSize; + } + + public void activate() throws ActivateException { + try { + InetSocketAddress hostAddress = new InetSocketAddress(host, port); + socketChannel = SocketChannel.open(hostAddress); + socketChannel.configureBlocking(false); + while (!socketChannel.finishConnect()) { + sleep(Worker.SLEEP); + } + selector = Selector.open(); + socketChannel.register(selector, SelectionKey.OP_READ); + } catch (Exception e) { + logger.error("", e); + throw new ActivateException(); + } + super.activate(); + } + + public final void work() { + try { + //System.out.println("Client: Waiting for select..."); + //System.out.println("Client: Number of selected keys: " + selector.select()); + selector.select(); + Set selectionKeySet = selector.selectedKeys(); + Iterator selectionKeyIterator = selectionKeySet.iterator(); + while (selectionKeyIterator.hasNext()) { + SelectionKey selectionKey = selectionKeyIterator.next(); + if (selectionKey.isReadable()) { + ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); + socketChannel.read(byteBuffer); + byte[] buffer = byteBuffer.array(); + for (Receiver receiver : receiverList) { + receiver.receive(buffer); + } + } else if (selectionKey.isWritable()) { + byte[] buffer; + buffer = (byte[]) selectionKey.attachment(); + System.out.println("poll() " + new String(buffer).trim()); + ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); + socketChannel.write(byteBuffer); + selectionKey.cancel(); + } + selectionKeyIterator.remove(); + } + } catch (Exception e) { + logger.error("", e); + } + } + + public void send(byte[] buffer) throws IOException { + selector.wakeup(); + socketChannel.register(selector, SelectionKey.OP_WRITE, buffer); + } + + public void close() throws IOException { + socketChannel.close(); + } + + public void register(Receiver receiver) { + receiverList.add(receiver); + } + + public void remove(Receiver receiver) { + receiverList.remove(receiver); + } +} \ No newline at end of file diff --git a/java/base/src/main/java/base/server/channel/TcpServer.java b/java/base/src/main/java/base/server/channel/TcpServer.java new file mode 100644 index 0000000..290b80a --- /dev/null +++ b/java/base/src/main/java/base/server/channel/TcpServer.java @@ -0,0 +1,121 @@ +package base.server.channel; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; + +import base.exception.worker.ActivateException; +import base.sender.Sender; +import base.work.Work; + +public class TcpServer extends Work implements Sender { + protected static final int BUFFER_SIZE = 1024; + + protected int port; + protected int bufferSize; + protected Constructor clientConstructor; + protected Selector selector; + protected ServerSocketChannel serverSocket; + protected ArrayList clientList; + + public TcpServer(int port) { + this(port, TcpServerClient.class); + } + + public TcpServer(int port, Class clientClass) { + this(port, clientClass, BUFFER_SIZE); + } + + public TcpServer(int port, Class clientClass, int bufferSize) { + this.port = port; + this.bufferSize = bufferSize; + try { + logger.error(clientClass.getName()); + clientConstructor = Class.forName(clientClass.getName()).getConstructor(getClass(), SocketChannel.class, Integer.class); + } catch (NoSuchMethodException | SecurityException | ClassNotFoundException e) { + logger.error("Failed to initialise client constructor"); + e.printStackTrace(); + } + clientList = new ArrayList(); + } + + public void activate() throws ActivateException { + try { + // Get selector + selector = Selector.open(); + + // Get server socket channel and register with selector + serverSocket = ServerSocketChannel.open(); + InetSocketAddress hostAddress = new InetSocketAddress(port); + serverSocket.bind(hostAddress); + serverSocket.configureBlocking(false); + serverSocket.register(selector, SelectionKey.OP_ACCEPT); + } catch (Exception e) { + throw new ActivateException(); + } + } + + public void work() { + try { + System.out.println("Server: Number of selected keys: " + selector.select()); + + Set selectionKeySet = selector.selectedKeys(); + Iterator selectionKeyIterator = selectionKeySet.iterator(); + + while (selectionKeyIterator.hasNext()) { + SelectionKey selectionKey = selectionKeyIterator.next(); + if (selectionKey.isAcceptable()) { + // Accept the new client connection + SocketChannel socketChannel = serverSocket.accept(); + socketChannel.configureBlocking(false); + + // Add the new connection to the selector + TcpServerClient serverClient = (TcpServerClient) clientConstructor.newInstance(this, socketChannel, bufferSize); + clientList.add(serverClient); + socketChannel.register(selector, SelectionKey.OP_READ, serverClient); + //initClient(serverClient); + System.out.println("Accepted new connection from client: " + socketChannel); + } else if (selectionKey.isReadable()) { + // Read the data from client + TcpServerClient serverClient = (TcpServerClient) selectionKey.attachment(); + serverClient.readable(); + } else if (selectionKey.isWritable()) { + // Write to client? + } + selectionKeyIterator.remove(); + } + } catch (IOException e) {} catch (InstantiationException e) { + logger.error("", e); + } catch (IllegalAccessException e) { + logger.error("", e); + } catch (IllegalArgumentException e) { + logger.error("", e); + } catch (InvocationTargetException e) { + logger.error("", e); + } + } + + protected void initClient(TcpServerClient serverClient) { + /*try { + serverClient.write(ByteBuffer.wrap(new String("Hi there!").getBytes())); + } catch (IOException e) { + logger.error("", e); + }*/ + } + + public void send(byte[] buffer) throws IOException { + for (TcpServerClient client : clientList) { + // Should be dealt with in clients own thread + client.send(buffer); + } + } +} \ No newline at end of file diff --git a/java/base/src/main/java/base/server/channel/TcpServerClient.java b/java/base/src/main/java/base/server/channel/TcpServerClient.java new file mode 100644 index 0000000..04ffa46 --- /dev/null +++ b/java/base/src/main/java/base/server/channel/TcpServerClient.java @@ -0,0 +1,64 @@ +package base.server.channel; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import base.receiver.Receiver; + +public class TcpServerClient implements Receiver { + protected static final int BUFFER_SIZE = 1024; + + protected TcpServer server; + protected SocketChannel socketChannel; + protected int bufferSize; + protected ByteBuffer byteBuffer; + + public TcpServerClient(TcpServer server, SocketChannel socketChannel) { + this(server, socketChannel, BUFFER_SIZE); + } + + public TcpServerClient(TcpServer server, SocketChannel socketChannel, Integer bufferSize) { + this.server = server; + this.socketChannel = socketChannel; + this.bufferSize = bufferSize; + byteBuffer = ByteBuffer.allocate(bufferSize); + } + + public void write(ByteBuffer byteBuffer) throws IOException { + socketChannel.write(byteBuffer); + } + + public void readable() throws IOException { + int read; + while (( read = socketChannel.read(byteBuffer)) > 0) { + //byteBuffer.flip(); + byte[] buffer = byteBuffer.array(); + receive(buffer); + System.out.println("readable() " + new String(buffer).trim()); + byteBuffer.clear(); + byteBuffer.put(new byte[bufferSize]); + byteBuffer.clear(); + } + if (read < 0) { + socketChannel.close(); + } + } + + public void receive(byte[] buffer) { + // Should be forwarded somewhere? + String output = new String(buffer).trim(); + System.err.println("Message read from client: " + output); + if (output.equals("Bye.")) { + try { + socketChannel.close(); + } catch (IOException e) {} + System.out.println("Client messages are complete; close."); + } + + } + + public void send(byte[] buffer) throws IOException { + write(ByteBuffer.wrap(buffer)); + } +} diff --git a/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java b/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java new file mode 100644 index 0000000..9e1c379 --- /dev/null +++ b/java/base/src/main/java/base/server/datagram/UdpMulticastClient.java @@ -0,0 +1,30 @@ +package base.server.datagram; + +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.MulticastSocket; + +public class UdpMulticastClient implements Runnable { + public void run() { + try { + MulticastSocket socket = new MulticastSocket(4446); + InetAddress group = InetAddress.getByName("239.255.255.255"); + socket.joinGroup(group); + + DatagramPacket packet; + while (true) { + byte[] buf = new byte[256]; + packet = new DatagramPacket(buf, buf.length); + socket.receive(packet); + + String received = new String(packet.getData()).trim(); + System.out.println("Quote of the Moment: " + received); + } + + //socket.leaveGroup(group); + //socket.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java b/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java new file mode 100644 index 0000000..612cac1 --- /dev/null +++ b/java/base/src/main/java/base/server/datagram/UdpMulticastServer.java @@ -0,0 +1,39 @@ +package base.server.datagram; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.MulticastSocket; + +public class UdpMulticastServer implements Runnable { + private MulticastSocket socket; + + public void run() { + try { + socket = new MulticastSocket(4445); + } catch (IOException e) { + e.printStackTrace(); + } + while (true) { + try { + byte[] buf = new byte[256]; + String dString = String.valueOf(Math.random()); + buf = dString.getBytes(); + + InetAddress group = InetAddress.getByName("239.255.255.255"); + DatagramPacket packet; + packet = new DatagramPacket(buf, buf.length, group, 4446); + socket.send(packet); + + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { } + } + catch (IOException e) { + e.printStackTrace(); + } + } + //socket.close(); + } +} diff --git a/java/base/src/main/java/base/server/socket/AbstractClient.java b/java/base/src/main/java/base/server/socket/AbstractClient.java new file mode 100644 index 0000000..72802b3 --- /dev/null +++ b/java/base/src/main/java/base/server/socket/AbstractClient.java @@ -0,0 +1,24 @@ +package base.server.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +import base.sender.Sender; +import base.work.Work; + +// Should be Listen, process writes in own thread +public abstract class AbstractClient extends Work implements Sender { + protected Socket socket; + protected InputStream inputStream; + protected OutputStream outputStream; + + public AbstractClient(Socket socket) { + this.socket = socket; + } + + public void send(byte[] buffer) throws IOException { + outputStream.write(buffer); + } +}