diff --git a/java/sound/src/main/java/old/List.java b/java/sound/src/main/java/old/List.java index 1372db1..0cc0733 100644 --- a/java/sound/src/main/java/old/List.java +++ b/java/sound/src/main/java/old/List.java @@ -8,7 +8,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Scanner; -import sound.consumer.Shoutcast; +import sound.consumer._Shoutcast; import base.exception.worker.ActivateException; import base.worker.ThreadWorker; @@ -185,7 +185,7 @@ public class List extends ThreadWorker { public static void main(String[] args) { int rate = 192; List list = new List(new File("mp3"), rate); - Shoutcast shoutcast = new Shoutcast(rate, 9876); + _Shoutcast shoutcast = new _Shoutcast(rate, 9876); shoutcast.start(); shoutcast.setInputStream(list.getInputStream()); shoutcast.setMetaBuffer(list.getMetaBuffer()); diff --git a/java/sound/src/main/java/sound/consumer/Shoutcast.java b/java/sound/src/main/java/sound/consumer/Shoutcast.java index f5ed14a..0708bac 100644 --- a/java/sound/src/main/java/sound/consumer/Shoutcast.java +++ b/java/sound/src/main/java/sound/consumer/Shoutcast.java @@ -1,72 +1,296 @@ package sound.consumer; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketException; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentLinkedQueue; -import base.server.socket.TcpServer; +import sound.Consumer; +import sound.Producer; +import sound.data.Data; +import sound.util.Buffer; +import base.exception.worker.ActivateException; +import base.exception.worker.DeactivateException; +import base.worker.Listener; +import base.worker.Worker; import com.Ostermiller.util.CircularObjectBuffer; -public class Shoutcast extends TcpServer { - protected int metadataInterval = 8192; +public class Shoutcast extends Worker implements Consumer { + public static final int PORT = 9876; + public static final int RATE = 192; // in kbps + public static final int META = 8192; // in bytes + public static final int STEP = 80; // in milliseconds + public static final int BUFFER = 2000; // in bytes + public static final int BUFFERING = 500; // in milliseconds + public static final String DATA = "StreamTitle='%s';StreamUrl='%s';"; protected int rate; - protected CircularObjectBuffer metaBuffer; - protected InputStream inputStream; - + protected int port; + protected Server server; + protected HashMap headerMap; + protected ConcurrentLinkedQueue clientList; + protected InputStream producerInputStream; + protected int chunk; + protected Buffer buffer; + protected byte[] bytes; + protected Data data; + protected String metaData; + private CircularObjectBuffer circularStringBuffer; + + public Shoutcast() { + this(RATE, PORT); + } + + public Shoutcast(int rate) { + this(rate, PORT); + } + public Shoutcast(int rate, int port) { - super(port, ShoutcastClient.class); this.rate = rate; + this.port = port; + clientList = new ConcurrentLinkedQueue(); + metaData = ""; + + chunk = STEP * rate / 8; + bytes = new byte[chunk]; + buffer = new Buffer(BUFFER * rate / 8); + + headerMap = new HashMap(); + headerMap.put("icy-notice1", "This stream requires Winamp"); + headerMap.put("icy-notice2", "Java SHOUTcast Server"); + headerMap.put("icy-name", "Java Radio"); + headerMap.put("icy-genre", "Java"); + headerMap.put("icy-url", "http://localhost"); + headerMap.put("content-type:", "audio/mpeg"); + headerMap.put("icy-pub", "0"); + headerMap.put("icy-metaint", String.valueOf(META)); + headerMap.put("icy-br", String.valueOf(rate)); } - public void x() { - // Accept new clients - // Transfer buffer - - - StringBuilder response = new StringBuilder(); - response.append("HTTP/1.1 200 OK\r\nContent-Type: audio/mpeg\r\n"); - - // add the stream name - response.append("icy-name: " + "hallo" + "\r\n"); - - // add metadata information - response.append("icy-metadata:1\r\n"); - response.append("icy-metaint:"); - response.append(metadataInterval ); - response.append("\r\n"); - - response.append("\r\n"); - - //out.write(response.toString().getBytes()); + public void activate() throws ActivateException { + logger.trace("Activate Server"); + server = new Server(port); + server.start(); + super.activate(); } - public void setInputStream(InputStream inputStream) { - this.inputStream = inputStream; + public boolean active() { + return active = server.active(); } - public void setMetaBuffer(CircularObjectBuffer metaBuffer) { - this.metaBuffer = metaBuffer; + public void deactivate() throws DeactivateException { + super.deactivate(); + server.stop(); } - public class ShoutcastClient extends TcpServer.Client { - int untilMeta = 0; + public void work() { + int progress; + try { + int read = 0; + if (producerInputStream != null) { + while (producerInputStream.available() < buffer.capacity) { + progress = (int) (producerInputStream.available() / (buffer.capacity / 100.0F)); + logger.debug("Filling buffer: " + progress + "%"); + sleep(BUFFERING); + } + read = producerInputStream.read(bytes); + } + data = new Data(bytes, read); + buffer.write(bytes, 0, read); + } catch (IOException e) { + logger.error(e.getMessage()); + } - public ShoutcastClient(Socket socket) { - super(socket); + for (Client client : clientList) { + if (client.active) { + client.add(data); + } + } + sleep(STEP); + } + + public void setMetaBuffer(CircularObjectBuffer circularStringBuffer) { + logger.debug("Set meta input stream"); + this.circularStringBuffer = circularStringBuffer; + } + + public void setMeta(String meta) { + logger.debug("Set meta string: " + meta); + metaData = meta; + } + + protected class Client extends Listener { + protected Socket socket; + protected InputStream inputStream; + protected OutputStream outputStream; + protected boolean writeMeta; + protected int untilMeta; + protected boolean active; + + public Client(Socket socket) throws IOException { + this.socket = socket; + inputStream = socket.getInputStream(); + outputStream = socket.getOutputStream(); + active = false; + clientList.add(this); + } + + public void activate() throws ActivateException { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + try { + String line; + while ((line = bufferedReader.readLine()) != null) { + if (line.startsWith("Icy-MetaData")) { + writeMeta = Integer.valueOf(line.substring(line.indexOf(":") + 1).trim()).intValue() == 1; + untilMeta = META; + } else if (line.equals("")) { + break; + } + } + logger.debug(String.format("Client accept meta: %s", Boolean.valueOf(writeMeta))); + + OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputStream); + outputStreamWriter.write("ICY 200 OK\r\n"); + for (Entry header : headerMap.entrySet()) { + outputStreamWriter.write(String.format("%s: %s\r\n", header.getKey(), header.getValue())); + } + outputStreamWriter.write("\r\n"); + outputStreamWriter.flush(); + + add(new Data(buffer.get())); + active = true; + } catch (IOException e) { + logger.error(e.getMessage()); + throw new ActivateException(); + } + super.activate(); + } + + public void exit() { + logger.debug("Client exit"); + super.exit(); + clientList.remove(this); + try { + inputStream.close(); + outputStream.close(); + socket.close(); + } catch (IOException e) { + logger.error(e.getMessage()); + } + } + + public void input(Data data) { + try { + byte[] bytes = data.get(); + if (writeMeta) { + int offset = 0; + while (data.length() - offset >= untilMeta) { + outputStream.write(bytes, offset, untilMeta); + writeMeta(); + offset += untilMeta; + untilMeta = META; + } + int length = data.length() - offset; + outputStream.write(bytes, offset, length); + untilMeta -= length; + } else { + outputStream.write(bytes); + } + } catch (SocketException e) { + exit(); + } catch (IOException e) { + exit(); + } + } + + protected void writeMeta() throws IOException { + if ((circularStringBuffer != null) + && (circularStringBuffer.getAvailable() > 0)) { + try { + String newMetaData = circularStringBuffer.read(); + if (!newMetaData.isEmpty() && !newMetaData.equals(metaData)) { + metaData = newMetaData; + } + } catch (InterruptedException e) { + logger.error(e.getMessage()); + } + } + + String meta = String.format("StreamTitle='%s';StreamUrl='%s';", metaData, "???"); + byte[] metaBytes = meta.getBytes(); + + int length = (int) Math.ceil(metaBytes.length / 16.0F); + outputStream.write(length); + outputStream.write(metaBytes); + + int padding = 16 * length - metaBytes.length; + outputStream.write(new byte[padding], 0, padding); + } + } + + protected class Server extends Worker { + protected int port; + protected ServerSocket serverSocket; + + public Server(int port) { + this.port = port; + } + + public boolean active() { + return active = serverSocket.isClosed() ? false : true; + } + + public void activate() throws ActivateException { + try { + serverSocket = new ServerSocket(port); + logger.debug("Server listening at port " + port); + } catch (IOException e) { + logger.error(e.getMessage()); + throw new ActivateException(); + } + super.activate(); } public void work() { - // - byte[] buffer = new byte[123]; try { - outputStream.write(buffer); - // Write some meta - + Socket socket = serverSocket.accept(); + logger.trace("Client connected: " + socket.getInetAddress().toString()); + Shoutcast.Client client = new Shoutcast.Client(socket); + client.start(); } catch (IOException e) { - logger.error("", e); + logger.error(e.getMessage()); + } + } + + public void deactivate() throws DeactivateException { + logger.debug("Server deactivate"); + super.deactivate(); + try { + serverSocket.close(); + } catch (IOException e) { + logger.error(e.getMessage()); + } + for (Shoutcast.Client client : clientList) { + client.stop(); } } } + public void start(Producer producer) { + start(producer, THREAD); + } + + public void start(Producer producer, boolean thread) { + producerInputStream = producer.getInputStream(); + producer.start(); + start(thread); + } } diff --git a/java/sound/src/main/java/sound/consumer/_Shoutcast.java b/java/sound/src/main/java/sound/consumer/_Shoutcast.java new file mode 100644 index 0000000..95d5760 --- /dev/null +++ b/java/sound/src/main/java/sound/consumer/_Shoutcast.java @@ -0,0 +1,72 @@ +package sound.consumer; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; + +import base.server.socket.TcpServer; + +import com.Ostermiller.util.CircularObjectBuffer; + +public class _Shoutcast extends TcpServer { + protected int metadataInterval = 8192; + protected int rate; + protected CircularObjectBuffer metaBuffer; + protected InputStream inputStream; + + public _Shoutcast(int rate, int port) { + super(port, ShoutcastClient.class); + this.rate = rate; + } + + public void x() { + // Accept new clients + // Transfer buffer + + + StringBuilder response = new StringBuilder(); + response.append("HTTP/1.1 200 OK\r\nContent-Type: audio/mpeg\r\n"); + + // add the stream name + response.append("icy-name: " + "hallo" + "\r\n"); + + // add metadata information + response.append("icy-metadata:1\r\n"); + response.append("icy-metaint:"); + response.append(metadataInterval ); + response.append("\r\n"); + + response.append("\r\n"); + + //out.write(response.toString().getBytes()); + } + + public void setInputStream(InputStream inputStream) { + this.inputStream = inputStream; + } + + public void setMetaBuffer(CircularObjectBuffer metaBuffer) { + this.metaBuffer = metaBuffer; + } + + public class ShoutcastClient extends TcpServer.Client { + int untilMeta = 0; + + public ShoutcastClient(Socket socket) { + super(socket); + } + + public void work() { + // + byte[] buffer = new byte[123]; + try { + outputStream.write(buffer); + // Write some meta + + } catch (IOException e) { + logger.error("", e); + } + } + } + +} diff --git a/java/sound/src/main/java/sound/data/Data.java b/java/sound/src/main/java/sound/data/Data.java new file mode 100644 index 0000000..d66cb3b --- /dev/null +++ b/java/sound/src/main/java/sound/data/Data.java @@ -0,0 +1,23 @@ +package sound.data; + +public class Data { + protected byte[] bytes; + protected int length; + + public Data(byte[] bytes) { + this(bytes, bytes.length); + } + + public Data(byte[] bytes, int length) { + this.bytes = bytes; + this.length = length; + } + + public byte[] get() { + return this.bytes; + } + + public int length() { + return this.length; + } +} \ No newline at end of file diff --git a/java/sound/src/main/java/sound/util/Buffer.java b/java/sound/src/main/java/sound/util/Buffer.java new file mode 100644 index 0000000..419ceaa --- /dev/null +++ b/java/sound/src/main/java/sound/util/Buffer.java @@ -0,0 +1,41 @@ +package sound.util; + +public class Buffer { + protected byte[] elements; + public int capacity; + protected int index; + protected int size; + + public Buffer(int capacity) { + elements = new byte[capacity]; + this.capacity = capacity; + index = 0; + size = 0; + } + + public synchronized void add(byte[] elements) { + for (byte element : elements) { + elements[index++ % capacity] = element; + if (size < capacity) { + ++size; + } + } + } + + public synchronized void write(byte[] elements, int offset, int length) { + for (int i = offset; i < length; ++i) { + this.elements[(index++ % capacity)] = elements[i]; + if (size < capacity) { + ++size; + } + } + } + + public synchronized byte[] get() { + byte[] elements = new byte[size]; + for (int i = 0; i < size; i++) { + elements[i] = elements[((index + i) % size)]; + } + return elements; + } +} \ No newline at end of file