Migrate to advanced worker model for worker/listener/timer using direct/thread/pool executor

This commit is contained in:
2015-04-26 18:15:55 +01:00
parent df0200e4c6
commit 0f7c701814
33 changed files with 634 additions and 333 deletions

View File

@@ -1,4 +1,10 @@
repositories {
mavenCentral();
}
dependencies {
compile 'org.slf4j:slf4j-api:1.7.7'
compile 'org.slf4j:slf4j-log4j12:1.7.7'
compile 'org.slf4j:slf4j-log4j12:1.7.7'
compile 'org.picocontainer:picocontainer:2.15'
compile 'junit:junit:4.12'
}

View File

@@ -4,9 +4,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import base.worker.Worker;
import base.worker.ThreadWorker;
public abstract class AbstractClient extends Worker {
public abstract class AbstractClient extends ThreadWorker {
protected Socket socket;
protected InputStream inputStream;
protected OutputStream outputStream;

View File

@@ -0,0 +1,61 @@
package base.work;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import base.worker.DirectListener;
import base.worker.ThreadListener;
import base.worker.Worker;
import base.worker.pool.Listener;
import base.worker.pool.ListenerPool;
import base.worker.pool.PooledListener;
public abstract class Listen<E> extends Work {
protected Listener<E> listener;
public Queue<E> queue;
public Listen() {
this(WORKER_TYPE);
}
protected Listen(Worker.Type workerType) {
switch (workerType) {
case DIRECT:
listener = new DirectListener<E>(this);
break;
default:
case THREAD:
listener = new ThreadListener<E>(this);
break;
}
queue = new ConcurrentLinkedQueue<E>();
}
protected Listen(Worker worker) {
this.work = worker;
queue = new ConcurrentLinkedQueue<E>();
}
protected Listen(ListenerPool<E> listenerPool) {
listener = new PooledListener<E>(this);
listenerPool.add((PooledListener<E>) listener);
queue = new ConcurrentLinkedQueue<E>();
}
public synchronized void add(E element) {
listener.add(element);
}
public void work() {
while (!queue.isEmpty()) {
input(queue.poll());
}
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {}
}
}
public void input(E element) {}
}

View File

@@ -0,0 +1,56 @@
package base.work;
import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
import base.worker.DirectWorker;
import base.worker.ThreadWorker;
import base.worker.Worker;
import base.worker.pool.PooledWorker;
import base.worker.pool.WorkerPool;
public abstract class Work {
protected static final Worker.Type WORKER_TYPE = Worker.Type.THREAD;
protected Worker work;
protected Work() {
this(WORKER_TYPE);
}
protected Work(Worker.Type workerType) {
switch (workerType) {
case DIRECT:
work = new DirectWorker(this);
break;
default:
case THREAD:
work = new ThreadWorker(this);
break;
}
}
protected Work(Worker worker) {
this.work = worker;
}
protected Work(WorkerPool workerPool) {
work = new PooledWorker(this);
workerPool.add((PooledWorker) work);
}
protected void sleep(int time) {
work.sleep(time);
}
public void start() {
work.start();
}
public void stop() {
work.stop();
}
public void activate() throws ActivateException {}
public void deactivate() throws DeactivateException {}
public abstract void work();
}

View File

@@ -1,9 +0,0 @@
package base.worker;
public abstract class AbstractWorker {
public abstract void start();
public abstract void stop();
protected abstract void work();
}

View File

@@ -0,0 +1,14 @@
package base.worker;
import base.work.Work;
public class DirectIntervalWorker extends ThreadIntervalWorker {
public DirectIntervalWorker(Work work, int interval) {
super(work, false);
this.interval = interval;
}
public DirectIntervalWorker(IntervalWork intervalWork) {
super(intervalWork);
}
}

View File

@@ -0,0 +1,16 @@
package base.worker;
import base.work.Listen;
import base.worker.pool.Listener;
public class DirectListener<E> extends ThreadListener<E> implements Listener<E> {
public DirectListener(Listen<E> listen) {
super(listen, false);
}
@Override
public void add(Object element) {
// TODO Auto-generated method stub
}
}

View File

@@ -0,0 +1,9 @@
package base.worker;
import base.work.Work;
public class DirectWorker extends ThreadWorker {
public DirectWorker(Work work) {
super(work, false);
}
}

View File

@@ -0,0 +1,37 @@
package base.worker;
import base.work.Work;
public abstract class IntervalWork extends Work {
protected IntervalWork() {
this(WORKER_TYPE);
}
protected IntervalWork(int interval) {
this(WORKER_TYPE, interval);
}
protected IntervalWork(Worker.Type workerType) {
switch (workerType) {
case DIRECT:
work = new DirectIntervalWorker(this);
break;
default:
case THREAD:
work = new ThreadIntervalWorker(this);
break;
}
}
protected IntervalWork(Worker.Type workerType, int interval) {
switch (workerType) {
case DIRECT:
work = new DirectIntervalWorker(this, interval);
break;
default:
case THREAD:
work = new ThreadIntervalWorker(this, interval);
break;
}
}
}

View File

@@ -1,81 +0,0 @@
package base.worker;
import java.util.Timer;
import java.util.TimerTask;
import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
public class IntervalWorker extends Worker {
protected static final boolean THREAD = true;
protected static final int INTERVAL = 500;
protected Timer timer;
public synchronized void start(boolean thread) {
if (!active) {
activate = true;
timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
IntervalWorker.this.run();
}}, 0, INTERVAL);
active = true;
}
if (!thread) {
try {
synchronized (this) {
wait();
}
} catch (InterruptedException e) {
logger.info("", e);
}
}
}
public synchronized void stop() {
if (active) {
timer.cancel();
deactivate = true;
run();
notifyAll();
}
}
public void run() {
if (activate && !active) {
try {
super.activate();
} catch (ActivateException e) {
logger.error("", e);
} finally {
activate = false;
}
} else if (deactivate && active) {
try {
super.deactivate();
} catch (DeactivateException e) {
logger.error("", e);
} finally {
deactivate = false;
}
}
if (active) {
work();
}
}
protected void work() {
System.out.println("(-:");
}
public static void main(String[] args) {
IntervalWorker intervalWorker = new IntervalWorker();
for (int i = 0; i < 3; ++i) {
intervalWorker.start(false);
System.out.println("--");
intervalWorker.sleep(200);
intervalWorker.stop();
}
}
}

View File

@@ -1,35 +0,0 @@
package base.worker;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public abstract class Listener<E> extends Worker {
protected Queue<E> queue;
public Listener() {
queue = new ConcurrentLinkedQueue<E>();
}
public synchronized void add(E element) {
queue.add(element);
notifyAll();
}
public final void work() {
while (!queue.isEmpty()) {
input(queue.poll());
}
if (!deactivate) {
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
logger.info("", e);
}
}
}
}
public abstract void input(E element);
}

View File

@@ -0,0 +1,61 @@
package base.worker;
import java.util.Timer;
import java.util.TimerTask;
import base.work.Work;
public class ThreadIntervalWorker extends ThreadWorker {
protected static final int INTERVAL = 500;
protected int interval;
public ThreadIntervalWorker(Work work) {
super(work);
interval = INTERVAL;
}
public ThreadIntervalWorker(Work work, boolean thread) {
super(work, thread);
interval = INTERVAL;
}
public ThreadIntervalWorker(Work work, int interval) {
super(work);
this.interval = interval;
}
protected Timer timer;
public synchronized void start(boolean thread) {
if (!active) {
activate = true;
timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
Worker worker = ThreadIntervalWorker.this;
worker.runActivate();
worker.runDeactivate();
worker.runWork();
}}, 0, interval);
active = true;
}
if (!thread) {
try {
synchronized (this) {
wait();
}
} catch (InterruptedException e) {
logger.info("", e);
}
}
}
public synchronized void stop() {
if (active) {
timer.cancel();
deactivate = true;
run();
notifyAll();
}
}
}

View File

@@ -0,0 +1,22 @@
package base.worker;
import base.work.Listen;
import base.worker.pool.Listener;
public class ThreadListener<E> extends ThreadWorker implements Listener<E> {
protected Listen<E> listen;
public ThreadListener(Listen<E> listen) {
super(listen);
this.listen = listen;
}
public ThreadListener(Listen<E> listen, boolean start) {
super(listen);
}
public void add(E element) {
System.out.println("ok! " + element);
listen.queue.add(element);
}
}

View File

@@ -0,0 +1,52 @@
package base.worker;
import base.work.Work;
public class ThreadWorker extends Worker implements Runnable {
protected static final boolean THREAD = true;
protected boolean thread = true;
public ThreadWorker(Work work, boolean thread) {
this(work);
this.thread = thread;
}
public ThreadWorker(Work work) {
super(work);
}
public synchronized void start(boolean thread) {
if (!active) {
activate = true;
}
if (!run) {
run = true;
if (thread) {
logger.debug("Start thread");
new Thread(this, getClass().getName()).start();
} else {
logger.debug("Run directly");
run();
}
} else {
notifyAll();
}
}
public synchronized void start() {
start(thread);
}
public synchronized void stop() {
if (active) {
deactivate = true;
}
notifyAll();
}
public void exit() {
stop();
run = false;
}
}

View File

@@ -5,62 +5,89 @@ import org.slf4j.LoggerFactory;
import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
import base.work.Work;
public abstract class Worker implements Runnable {
public abstract class Worker {
public enum Type {
DIRECT, THREAD, POOLED
}
protected static final int SLEEP = 100;
protected Logger logger = LoggerFactory.getLogger(getClass());
protected static final boolean THREAD = true;
protected static final int SLEEP = 100;
protected boolean thread = true;
protected boolean run = false;
protected boolean active = false;
protected boolean activate = false;
protected boolean deactivate = false;
public Worker(boolean thread) {
this.thread = thread;
protected Work work;
public Worker(Work work) {
this.work = work;
}
public abstract void start();
public abstract void stop();
public boolean active() {
return active;
}
public Worker() {
this(THREAD);
}
public synchronized void start(boolean thread) {
if (!active) {
activate = true;
public void run() {
while (run || deactivate) {
runActivate();
runDeactivate();
runWork();
}
if (!run) {
run = true;
if (thread) {
logger.debug("Start thread");
new Thread(this, getClass().getName()).start();
} else {
logger.debug("Run directly");
run();
}
public void runActivate() {
if (activate && !active) {
try {
work.activate();
} catch (ActivateException e) {
logger.error("", e);
} finally {
activate = false;
active = true;
}
}
}
public void runDeactivate() {
if (deactivate && active) {
try {
work.deactivate();
} catch (DeactivateException e) {
logger.error("", e);
} finally {
deactivate = false;
active = false;
}
} else {
notifyAll();
}
}
public synchronized void start() {
start(thread);
}
public synchronized void stop() {
public void runWork() {
if (active) {
deactivate = true;
work.work();
} else if (run) {
try {
synchronized (this) {
wait();
}
} catch (InterruptedException e) {
logger.info("", e);
}
}
notifyAll();
}
public void exit() {
stop();
run = false;
protected void sleep() {
sleep(SLEEP);
}
protected void sleep(int time) {
public void sleep(int time) {
try {
if (time > 0) {
Thread.sleep(time);
@@ -69,55 +96,4 @@ public abstract class Worker implements Runnable {
logger.info("", e);
}
}
protected void sleep() {
sleep(SLEEP);
}
public boolean active() {
return active;
}
protected void activate() throws ActivateException {
active = true;
}
protected void deactivate() throws DeactivateException {
active = false;
}
public void run() {
while (run || deactivate) {
if (activate && !active) {
try {
activate();
} catch (ActivateException e) {
logger.error("", e);
} finally {
activate = false;
}
} else if (deactivate && active) {
try {
deactivate();
} catch (DeactivateException e) {
logger.error("", e);
} finally {
deactivate = false;
}
}
if (active) {
work();
} else if (run) {
try {
synchronized (this) {
wait();
}
} catch (InterruptedException e) {
logger.info("", e);
}
}
}
}
protected abstract void work();
}

View File

@@ -0,0 +1,5 @@
package base.worker.pool;
public interface Listener<E> {
public void add(E element);
}

View File

@@ -1,9 +1,9 @@
package base.worker.pooled;
package base.worker.pool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ListenerPool<E> {
@@ -13,7 +13,7 @@ public class ListenerPool<E> {
public ListenerPool(int poolSize) {
this.poolSize = poolSize;
queue = new SynchronousQueue<Wrapper<E>>();
queue = new LinkedBlockingQueue<Wrapper<E>>();
executorService = Executors.newFixedThreadPool(poolSize);
}

View File

@@ -1,4 +1,4 @@
package base.worker.pooled;
package base.worker.pool;
import java.util.concurrent.BlockingQueue;

View File

@@ -0,0 +1,28 @@
package base.worker.pool;
import java.util.concurrent.BlockingQueue;
import base.work.Listen;
public class PooledListener<E> extends PooledWorker implements Listener<E> {
protected BlockingQueue<Wrapper<E>> poolQueue;
protected Listen<E> listen;
public PooledListener(Listen<E> listen) {
super(listen);
this.listen = listen;
}
public void setPoolQueue(BlockingQueue<Wrapper<E>> poolQueue) {
this.poolQueue = poolQueue;
}
public synchronized void add(E element) {
Wrapper<E> wrapper = new Wrapper<E>(this, element);
poolQueue.add(wrapper);
}
void input(E element) {
listen.input(element);
}
}

View File

@@ -1,13 +1,18 @@
package base.worker.pooled;
package base.worker.pool;
import java.util.concurrent.BlockingQueue;
import base.work.Work;
import base.worker.Worker;
public abstract class PooledWorker extends Worker {
public class PooledWorker extends Worker {
protected BlockingQueue<Worker> activateQueue;
protected BlockingQueue<Worker> deactivateQueue;
public PooledWorker(Work work) {
super(work);
}
public void setActivateQueue(BlockingQueue<Worker> activateQueue) {
this.activateQueue = activateQueue;
}

View File

@@ -1,4 +1,4 @@
package base.worker.pooled;
package base.worker.pool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;

View File

@@ -1,4 +1,4 @@
package base.worker.pooled;
package base.worker.pool;
import java.util.concurrent.BlockingQueue;
@@ -34,7 +34,7 @@ public class WorkerPoolRunnable implements Runnable {
} catch (InterruptedException e) {}
} else {
Worker worker = workerCycle.next();
worker.work();
worker.runWork();
}
}
}

View File

@@ -1,4 +1,4 @@
package base.worker.pooled;
package base.worker.pool;
class Wrapper<E> {

View File

@@ -1,18 +0,0 @@
package base.worker.pooled;
import java.util.concurrent.BlockingQueue;
public abstract class PooledListener<E> {
protected BlockingQueue<Wrapper<E>> poolQueue;
public void setPoolQueue(BlockingQueue<Wrapper<E>> poolQueue) {
this.poolQueue = poolQueue;
}
public synchronized void add(E element) {
Wrapper<E> wrapper = new Wrapper<E>(this, element);
poolQueue.add(wrapper);
}
abstract void input(E element);
}

View File

@@ -1,80 +0,0 @@
package base.worker.pooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
import base.worker.Worker;
public class TestPooledWorker extends PooledWorker {
protected int id;
protected volatile int work;
public TestPooledWorker(int id) {
this.id = id;
}
protected void setWork(int work) {
System.out.println("#" + id + ", set work @ " + work);
this.work = work;
}
public void work() {
System.out.println("#" + id + ", work = " + work);
if (--work < 1) {
stop();
}
sleep(300);
}
protected void activate() throws ActivateException {
System.out.println("#" + id + ", activating...");
super.activate();
}
protected void deactivate() throws DeactivateException {
super.deactivate();
System.out.println("#" + id + ", deactivating...");
}
public static void main(String[] args) {
WorkerPool workerPool = new WorkerPool(3);
List<TestPooledWorker> workerList = new ArrayList<TestPooledWorker>();
for (int i = 0; i < 10; ++i) {
TestPooledWorker worker = new TestPooledWorker(i + 1);
workerPool.add(worker);
workerList.add(worker);
}
workerPool.start();
System.out.println("Starting work!");
int work = 1000;
ArrayList<Worker> activeWorkerList = new ArrayList<Worker>();
for (int i = 0; i < 8; ++i) {
TestPooledWorker worker = workerList.get((new Random()).nextInt(workerList.size()));
worker.setWork(work);
worker.start();
activeWorkerList.add(worker);
}
int i = 0;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
for (Worker worker : activeWorkerList) {
if (++i > 5) {
break;
}
worker.stop();
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {}
System.exit(0);
}
}

View File

@@ -0,0 +1,15 @@
package worker;
import worker.dummy.DummyWork;
public class TestDirectWork {
public static void main(String[] args) {
DummyWork work = new DummyWork(1);
work.setWork(100);
work.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {}
}
}

View File

@@ -0,0 +1,18 @@
package worker;
import worker.dummy.DummyIntervalWork;
import base.work.Work;
public class TestIntervalWork {
public static void main(String[] args) {
Work work = new DummyIntervalWork(500);
for (int i = 0; i < 10; ++i) {
work.start();
System.out.println("--");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
work.stop();
}
}
}

View File

@@ -0,0 +1,19 @@
package worker;
import worker.dummy.DummyListen;
public class TestListen {
public static void main(String[] args) {
DummyListen listen = new DummyListen(0);
listen.start();
for (int i = 0; i < 10; ++i) {
listen.add(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {}
}
}

View File

@@ -1,13 +1,16 @@
package base.worker.pooled;
package worker;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class TestPooledListener extends PooledListener<Integer> {
import worker.dummy.DummyListen;
import base.worker.pool.ListenerPool;
public class TestPooledListen {
protected int id;
public TestPooledListener(int id) {
public TestPooledListen(int id) {
this.id = id;
}
@@ -16,21 +19,20 @@ public class TestPooledListener extends PooledListener<Integer> {
}
public static void main(String[] args) {
ListenerPool<Integer> listenerPool = new ListenerPool<Integer>(5);
List<PooledListener<Integer>> listenerList = new ArrayList<PooledListener<Integer>>();
ListenerPool<Integer> listenerPool = new ListenerPool<Integer>(5);
List<DummyListen> listenList = new ArrayList<DummyListen>();
for (int i = 0; i < 20; ++i) {
PooledListener<Integer> pooledListener = listenerPool.add(new TestPooledListener(i + 1));
listenerList.add(pooledListener);
DummyListen listen = new DummyListen(listenerPool, i + 1);
listenList.add(listen);
}
listenerPool.start();
System.out.println("Starting to give out elements!");
for (int i = 0; i < 100; ++i) {
PooledListener<Integer> randomListener = listenerList.get((new Random()).nextInt(listenerList.size()));
randomListener.add(i);
DummyListen randomListen = listenList.get((new Random()).nextInt(listenList.size()));
randomListen.add(i);
}
//listenerPool.await();
}
}

View File

@@ -0,0 +1,45 @@
package worker;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import worker.dummy.DummyWork;
import base.work.Work;
import base.worker.pool.WorkerPool;
public class TestPooledWork {
public static void main(String[] args) {
WorkerPool workerPool = new WorkerPool(3);
List<DummyWork> workList = new ArrayList<DummyWork>();
for (int i = 0; i < 10; ++i) {
DummyWork work = new DummyWork(workerPool, i + 1);
workList.add(work);
}
workerPool.start();
System.out.println("Starting work!");
ArrayList<Work> activeWorkList = new ArrayList<Work>();
for (int i = 0; i < 8; ++i) {
DummyWork work = workList.get((new Random()).nextInt(workList.size()));
work.setWork(1000);
work.start();
activeWorkList.add(work);
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
int i = 0;
for (Work work : activeWorkList) {
if (++i > 5) {
break;
}
work.stop();
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {}
System.exit(0);
}
}

View File

@@ -0,0 +1,13 @@
package worker.dummy;
import base.worker.IntervalWork;
public class DummyIntervalWork extends IntervalWork {
public DummyIntervalWork(int interval) {
super(interval);
}
public void work() {
System.out.println(":-)");
}
}

View File

@@ -0,0 +1,22 @@
package worker.dummy;
import base.work.Listen;
import base.worker.pool.ListenerPool;
public class DummyListen extends Listen<Integer> {
protected int id;
public DummyListen(ListenerPool<Integer> listenerPool, int id) {
super(listenerPool);
this.id = id;
}
public DummyListen(int id) {
super();
this.id = id;
}
public void input(Integer input) {
System.out.println("#" + id + ", input = " + input);
}
}

View File

@@ -0,0 +1,42 @@
package worker.dummy;
import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
import base.work.Work;
import base.worker.pool.WorkerPool;
public class DummyWork extends Work {
protected int id;
protected volatile int work;
public DummyWork(int id) {
super();
this.id = id;
}
public DummyWork(WorkerPool workerPool, int id) {
super(workerPool);
this.id = id;
}
public void setWork(int work) {
System.out.println("#" + id + ", set work @ " + work);
this.work = work;
}
public void work() {
System.out.println("#" + id + ", work = " + work);
if (--work < 1) {
stop();
}
sleep(300);
}
public void activate() throws ActivateException {
System.out.println("#" + id + ", activating...");
}
public void deactivate() throws DeactivateException {
System.out.println("#" + id + ", deactivating...");
}
}