Migrate from worker to work model

This commit is contained in:
2015-06-21 18:02:11 +01:00
parent ec2d83601f
commit ef689c9c3a
9 changed files with 68 additions and 166 deletions

View File

@@ -6,16 +6,16 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import sound.util.Utils;
import javazoom.jl.decoder.Bitstream; import javazoom.jl.decoder.Bitstream;
import javazoom.jl.decoder.BitstreamException; import javazoom.jl.decoder.BitstreamException;
import sound.util.Utils;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.worker.ThreadWorker; import base.work.Work;
import com.Ostermiller.util.CircularByteBuffer; import com.Ostermiller.util.CircularByteBuffer;
public class Converter extends ThreadWorker { public class Converter extends Work {
public static final String COMMAND = "lame --mp3input --cbr %s - - --quiet"; public static final String COMMAND = "lame --mp3input --cbr %s - - --quiet";
public static final int BYTES = 4096; // bytes public static final int BYTES = 4096; // bytes
public static final int BUFFER = 30000; // milliseconds public static final int BUFFER = 30000; // milliseconds
@@ -37,6 +37,7 @@ public class Converter extends ThreadWorker {
} }
public Converter(InputStream inputStream, int targetRate) { public Converter(InputStream inputStream, int targetRate) {
super();
this.sourceInputStream = inputStream; this.sourceInputStream = inputStream;
this.targetRate = targetRate; this.targetRate = targetRate;
bufferWorker = new BufferWorker(); bufferWorker = new BufferWorker();
@@ -87,7 +88,7 @@ public class Converter extends ThreadWorker {
notifyAll(); notifyAll();
} }
protected void deactivate() throws DeactivateException { public void deactivate() throws DeactivateException {
super.deactivate(); super.deactivate();
try { try {
sourceInputStream.close(); sourceInputStream.close();
@@ -103,7 +104,7 @@ public class Converter extends ThreadWorker {
} }
} }
protected void work() { public void work() {
if (!convert) { if (!convert) {
try { try {
synchronized (this) { synchronized (this) {
@@ -118,7 +119,8 @@ public class Converter extends ThreadWorker {
int read = 0; int read = 0;
try { try {
logger.debug("Writing input to process"); logger.debug("Writing input to process");
while ((read = sourceInputStream.read(bytes)) > 0 && !deactivate) { // Should be interrupted by stop()/exit()
while ((read = sourceInputStream.read(bytes)) > 0) {
/* Limit buffer size */ /* Limit buffer size */
while (inputStream.available() > buffer) { while (inputStream.available() > buffer) {
int progress = (int) ((1 - (inputStream.available() - buffer) / (float) buffer) * 100); int progress = (int) ((1 - (inputStream.available() - buffer) / (float) buffer) * 100);
@@ -140,9 +142,7 @@ public class Converter extends ThreadWorker {
public synchronized InputStream getInputStream() { public synchronized InputStream getInputStream() {
if (!active()) { if (!active()) {
if (!activate) {
start(); start();
}
try { try {
wait(); wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@@ -156,8 +156,8 @@ public class Converter extends ThreadWorker {
this.inputStream = inputStream; this.inputStream = inputStream;
} }
class BufferWorker extends ThreadWorker { class BufferWorker extends Work {
protected void work() { public void work() {
byte[] bytes = new byte[BYTES]; byte[] bytes = new byte[BYTES];
int read = 0; int read = 0;
try { try {

View File

@@ -8,15 +8,15 @@ import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Scanner; import java.util.Scanner;
import sound.consumer._Shoutcast; import sound.consumer.Shoutcast;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.worker.ThreadWorker; import base.work.Work;
import com.Ostermiller.util.BufferOverflowException; import com.Ostermiller.util.BufferOverflowException;
import com.Ostermiller.util.CircularByteBuffer; import com.Ostermiller.util.CircularByteBuffer;
import com.Ostermiller.util.CircularObjectBuffer; import com.Ostermiller.util.CircularObjectBuffer;
public class List extends ThreadWorker { public class List extends Work {
public static final int STEP = 80; // milliseconds public static final int STEP = 80; // milliseconds
public static final int RATE = 192; // kbps public static final int RATE = 192; // kbps
public static final int OVERLAP = 20000; // milliseconds public static final int OVERLAP = 20000; // milliseconds
@@ -59,13 +59,14 @@ public class List extends ThreadWorker {
} }
} }
protected synchronized void activate() throws ActivateException { public synchronized void activate() throws ActivateException {
try { try {
Scanner scanner = new Scanner(file); Scanner scanner = new Scanner(file);
ArrayList<String> fileList = new ArrayList<String>(); ArrayList<String> fileList = new ArrayList<String>();
while (scanner.hasNextLine()) { while (scanner.hasNextLine()) {
fileList.add(scanner.nextLine()); fileList.add(scanner.nextLine());
} }
scanner.close();
if (fileList.size() > 0) { if (fileList.size() > 0) {
fileArray = fileList.toArray(new String[0]); fileArray = fileList.toArray(new String[0]);
@@ -84,7 +85,7 @@ public class List extends ThreadWorker {
throw new ActivateException(); throw new ActivateException();
} }
protected synchronized void work() { public synchronized void work() {
try { try {
int left = chunk; int left = chunk;
while (left > 0) { while (left > 0) {
@@ -182,13 +183,14 @@ public class List extends ThreadWorker {
return circularStringBuffer; return circularStringBuffer;
} }
public static void main(String[] args) { public static void main(String[] args) throws Exception {
int rate = 192; int rate = 192;
List list = new List(new File("mp3"), rate); List list = new List(new File(List.class.getClassLoader().getResource("txt/mp3").toURI()), rate);
_Shoutcast shoutcast = new _Shoutcast(rate, 9876); Shoutcast shoutcast = new Shoutcast(rate, 9876);
shoutcast.start();
shoutcast.setInputStream(list.getInputStream()); shoutcast.setInputStream(list.getInputStream());
shoutcast.setMetaBuffer(list.getMetaBuffer()); shoutcast.setMetaBuffer(list.getMetaBuffer());
shoutcast.start();
while (true) { while (true) {
try { try {
Thread.sleep(15000); Thread.sleep(15000);

View File

@@ -1,11 +1,9 @@
package old; package old;
import java.io.InputStream;
import sound.Consumer; import sound.Consumer;
import sound.Producer; import sound.Producer;
public class Transducer implements Consumer, Producer { public abstract class Transducer implements Consumer, Producer {
public int rate; public int rate;
public Transducer(Producer producer) { public Transducer(Producer producer) {
@@ -15,32 +13,4 @@ public class Transducer implements Consumer, Producer {
public int getRate() { public int getRate() {
return rate; return rate;
} }
public InputStream getInputStream() {
// TODO Auto-generated method stub
return null;
}
public void start(Producer producer) {
// TODO Auto-generated method stub
}
@Override
public void start() {
// TODO Auto-generated method stub
}
@Override
public void stop() {
// TODO Auto-generated method stub
}
@Override
public void exit() {
// TODO Auto-generated method stub
}
} }

View File

@@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory;
import sound.util.Tool; import sound.util.Tool;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.worker.ThreadWorker; import base.work.Work;
public class Source implements Consumer { public class Source implements Consumer {
protected Logger logger = LoggerFactory.getLogger(getClass()); protected Logger logger = LoggerFactory.getLogger(getClass());
@@ -27,47 +27,47 @@ public class Source implements Consumer {
protected String name; protected String name;
protected Producer producer; protected Producer producer;
protected InputStream producerInputStream; protected InputStream producerInputStream;
protected ThreadWorker worker; protected Work work;
public Source(String name) throws LineUnavailableException { public Source(String name) throws LineUnavailableException {
this.name = name; this.name = name;
} }
public void start() { public void start() {
if (worker != null) { if (work != null) {
worker.start(true); work.start();
} }
} }
public void start(Producer producer) { public void start(Producer producer) {
this.producer = producer; this.producer = producer;
producerInputStream = producer.getInputStream(); producerInputStream = producer.getInputStream();
if (worker != null) { if (work != null) {
worker.exit(); work.exit();
} }
if (producer instanceof Format.Standard) { if (producer instanceof Format.Standard) {
logger.debug("Format.Standard"); logger.debug("Format.Standard");
worker = new DefaultWorker((Format.Standard) producer); work = new DefaultWorker((Format.Standard) producer);
} else if (producer instanceof Format.Mp3) { } else if (producer instanceof Format.Mp3) {
logger.debug("Format.Mp3"); logger.debug("Format.Mp3");
worker = new Mp3Worker((Format.Mp3) producer); work = new Mp3Worker((Format.Mp3) producer);
} }
start(); start();
} }
public void stop() { public void stop() {
if (worker != null) { if (work != null) {
worker.stop(); work.stop();
} }
} }
public void exit() { public void exit() {
if (worker != null) { if (work != null) {
worker.exit(); work.exit();
} }
} }
protected class DefaultWorker extends ThreadWorker { protected class DefaultWorker extends Work {
protected Format.Standard format; protected Format.Standard format;
protected SourceDataLine line; protected SourceDataLine line;
@@ -104,7 +104,7 @@ public class Source implements Consumer {
line.close(); line.close();
} }
protected void work() { public void work() {
try { try {
byte[] buffer = new byte[BUFFER_SIZE]; byte[] buffer = new byte[BUFFER_SIZE];
int read = producerInputStream.read(buffer, 0, buffer.length); int read = producerInputStream.read(buffer, 0, buffer.length);
@@ -120,7 +120,7 @@ public class Source implements Consumer {
} }
} }
protected class Mp3Worker extends ThreadWorker { protected class Mp3Worker extends Work {
protected Format.Mp3 format; protected Format.Mp3 format;
protected Player player; protected Player player;
@@ -143,11 +143,11 @@ public class Source implements Consumer {
player.close(); player.close();
} }
protected void work() { public void work() {
try { try {
if (player == null) { if (player == null) {
player = new Player(producerInputStream); player = new Player(producerInputStream);
sleep(SLEEP); sleep(100);
} }
player.play(FRAMES); player.play(FRAMES);
} catch (JavaLayerException e) { } catch (JavaLayerException e) {

View File

@@ -16,9 +16,9 @@ import sound.util.SoxBuilder.File.Type;
import sound.util.SoxBuilder.Option; import sound.util.SoxBuilder.Option;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.worker.ThreadWorker; import base.work.Work;
public class Port extends ThreadWorker implements Consumer { public class Port extends Work implements Consumer {
protected static final int BUFFER_SIZE = 1024 * 4; // in bytes protected static final int BUFFER_SIZE = 1024 * 4; // in bytes
protected String device; protected String device;
@@ -37,7 +37,7 @@ public class Port extends ThreadWorker implements Consumer {
} }
public void start(Producer producer) { public void start(Producer producer) {
start(producer, THREAD); start(producer);
} }
@SuppressWarnings("static-access") @SuppressWarnings("static-access")
@@ -61,10 +61,10 @@ public class Port extends ThreadWorker implements Consumer {
processBuilder = new ProcessBuilder(command.split(" ")); processBuilder = new ProcessBuilder(command.split(" "));
processBuilder.environment().put("AUDIODEV", device); processBuilder.environment().put("AUDIODEV", device);
start(thread); start();
} }
protected void activate() throws ActivateException { public void activate() throws ActivateException {
producer.start(); producer.start();
if (process == null) { if (process == null) {
try { try {
@@ -78,7 +78,7 @@ public class Port extends ThreadWorker implements Consumer {
super.activate(); super.activate();
} }
protected void deactivate() throws DeactivateException { public void deactivate() throws DeactivateException {
super.deactivate(); super.deactivate();
try { try {
processOutputStream.flush(); processOutputStream.flush();
@@ -104,7 +104,7 @@ public class Port extends ThreadWorker implements Consumer {
} }
} }
protected void work() { public void work() {
try { try {
byte[] buffer = new byte[BUFFER_SIZE]; byte[] buffer = new byte[BUFFER_SIZE];
int read = producerInputStream.read(buffer, 0, buffer.length); int read = producerInputStream.read(buffer, 0, buffer.length);

View File

@@ -19,12 +19,12 @@ import sound.data.Data;
import sound.util.Buffer; import sound.util.Buffer;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.worker.Listener; import base.work.Listen;
import base.worker.Worker; import base.work.Work;
import com.Ostermiller.util.CircularObjectBuffer; import com.Ostermiller.util.CircularObjectBuffer;
public class Shoutcast extends Worker implements Consumer { public class Shoutcast extends Work implements Consumer {
public static final int PORT = 9876; public static final int PORT = 9876;
public static final int RATE = 192; // in kbps public static final int RATE = 192; // in kbps
public static final int META = 8192; // in bytes public static final int META = 8192; // in bytes
@@ -83,7 +83,7 @@ public class Shoutcast extends Worker implements Consumer {
} }
public boolean active() { public boolean active() {
return active = server.active(); return server.active();
} }
public void deactivate() throws DeactivateException { public void deactivate() throws DeactivateException {
@@ -127,7 +127,7 @@ public class Shoutcast extends Worker implements Consumer {
metaData = meta; metaData = meta;
} }
protected class Client extends Listener<Data> { protected class Client extends Listen<Data> {
protected Socket socket; protected Socket socket;
protected InputStream inputStream; protected InputStream inputStream;
protected OutputStream outputStream; protected OutputStream outputStream;
@@ -236,7 +236,7 @@ public class Shoutcast extends Worker implements Consumer {
} }
} }
protected class Server extends Worker { protected class Server extends Work {
protected int port; protected int port;
protected ServerSocket serverSocket; protected ServerSocket serverSocket;
@@ -245,7 +245,7 @@ public class Shoutcast extends Worker implements Consumer {
} }
public boolean active() { public boolean active() {
return active = serverSocket.isClosed() ? false : true; return serverSocket.isClosed() ? false : true;
} }
public void activate() throws ActivateException { public void activate() throws ActivateException {
@@ -253,7 +253,7 @@ public class Shoutcast extends Worker implements Consumer {
serverSocket = new ServerSocket(port); serverSocket = new ServerSocket(port);
logger.debug("Server listening at port " + port); logger.debug("Server listening at port " + port);
} catch (IOException e) { } catch (IOException e) {
logger.error(e.getMessage()); logger.error("", e);
throw new ActivateException(); throw new ActivateException();
} }
super.activate(); super.activate();
@@ -285,12 +285,12 @@ public class Shoutcast extends Worker implements Consumer {
} }
public void start(Producer producer) { public void start(Producer producer) {
start(producer, THREAD);
}
public void start(Producer producer, boolean thread) {
producerInputStream = producer.getInputStream(); producerInputStream = producer.getInputStream();
producer.start(); producer.start();
start(thread); super.start();
}
public void setInputStream(InputStream inputStream) {
producerInputStream = inputStream;
} }
} }

View File

@@ -1,72 +0,0 @@
package sound.consumer;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import base.server.socket.TcpServer;
import com.Ostermiller.util.CircularObjectBuffer;
public class _Shoutcast extends TcpServer {
protected int metadataInterval = 8192;
protected int rate;
protected CircularObjectBuffer<String> metaBuffer;
protected InputStream inputStream;
public _Shoutcast(int rate, int port) {
super(port, ShoutcastClient.class);
this.rate = rate;
}
public void x() {
// Accept new clients
// Transfer buffer
StringBuilder response = new StringBuilder();
response.append("HTTP/1.1 200 OK\r\nContent-Type: audio/mpeg\r\n");
// add the stream name
response.append("icy-name: " + "hallo" + "\r\n");
// add metadata information
response.append("icy-metadata:1\r\n");
response.append("icy-metaint:");
response.append(metadataInterval );
response.append("\r\n");
response.append("\r\n");
//out.write(response.toString().getBytes());
}
public void setInputStream(InputStream inputStream) {
this.inputStream = inputStream;
}
public void setMetaBuffer(CircularObjectBuffer<String> metaBuffer) {
this.metaBuffer = metaBuffer;
}
public class ShoutcastClient extends TcpServer.Client {
int untilMeta = 0;
public ShoutcastClient(Socket socket) {
super(socket);
}
public void work() {
//
byte[] buffer = new byte[123];
try {
outputStream.write(buffer);
// Write some meta
} catch (IOException e) {
logger.error("", e);
}
}
}
}

View File

@@ -12,12 +12,12 @@ import sound.Producer;
import sound.stream.HoardedInputStream; import sound.stream.HoardedInputStream;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.worker.ThreadWorker; import base.work.Work;
import com.Ostermiller.util.CircularByteBuffer; import com.Ostermiller.util.CircularByteBuffer;
import com.Ostermiller.util.CircularObjectBuffer; import com.Ostermiller.util.CircularObjectBuffer;
public class Stream extends ThreadWorker implements Producer, Format.Mp3 { public class Stream extends Work implements Producer, Format.Mp3 {
public static final int STEP = 80; // in milliseconds public static final int STEP = 80; // in milliseconds
protected String http; protected String http;
@@ -34,6 +34,7 @@ public class Stream extends ThreadWorker implements Producer, Format.Mp3 {
protected String metaData; protected String metaData;
public Stream(String http) { public Stream(String http) {
super();
this.http = http; this.http = http;
meta = -1; meta = -1;
rate = -1; rate = -1;
@@ -64,7 +65,7 @@ public class Stream extends ThreadWorker implements Producer, Format.Mp3 {
} }
} }
protected void activate() throws ActivateException { public void activate() throws ActivateException {
try { try {
/* Initialize connection */ /* Initialize connection */
URL url = new URL(http); URL url = new URL(http);
@@ -122,7 +123,7 @@ public class Stream extends ThreadWorker implements Producer, Format.Mp3 {
} }
} }
protected void work() { public void work() {
int left = chunk; int left = chunk;
/* Handle media at appropriate times * /* Handle media at appropriate times *

View File

@@ -6,6 +6,7 @@ import sound.Consumer;
import sound.Producer; import sound.Producer;
import sound.Source; import sound.Source;
import sound.consumer.Port; import sound.consumer.Port;
import sound.consumer.Shoutcast;
import sound.producer.Stream; import sound.producer.Stream;
public class Test { public class Test {
@@ -20,13 +21,13 @@ public class Test {
Consumer c1 = new Source("Java Sound Audio Engine"); Consumer c1 = new Source("Java Sound Audio Engine");
Consumer c2 = new Port("Speakers (Creative SB X-Fi)"); Consumer c2 = new Port("Speakers (Creative SB X-Fi)");
//Consumer c3 = new Shoutcast(); Consumer c3 = new Shoutcast();
//Consumer c4 = new Player(); //Consumer c4 = new Player();
//Consumer c5 = new Writer(new File("stream.out")); //Consumer c5 = new Writer(new File("stream.out"));
//Utils.write(p3.getInputStream(), new File("stream.out")); //Utils.write(p3.getInputStream(), new File("stream.out"));
//Utils.play(p3.getInputStream()); //Utils.play(p3.getInputStream());
c1.start(p3); c3.start(p4);
//while (true) { //while (true) {
//Thread.sleep(300000); //Thread.sleep(300000);