Initial implementation of Worker and Listener using Executor framework

This commit is contained in:
2015-04-26 11:35:03 +01:00
parent cf66d68c00
commit df0200e4c6
10 changed files with 344 additions and 0 deletions

View File

@@ -0,0 +1,9 @@
package base.worker;
public abstract class AbstractWorker {
public abstract void start();
public abstract void stop();
protected abstract void work();
}

View File

@@ -0,0 +1,38 @@
package base.worker.pooled;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class ListenerPool<E> {
protected int poolSize;
protected BlockingQueue<Wrapper<E>> queue;
protected ExecutorService executorService;
public ListenerPool(int poolSize) {
this.poolSize = poolSize;
queue = new SynchronousQueue<Wrapper<E>>();
executorService = Executors.newFixedThreadPool(poolSize);
}
public PooledListener<E> add(PooledListener<E> listener) {
listener.setPoolQueue(queue);
return listener;
}
public void start() {
for (int i = 0; i < poolSize; ++i) {
Runnable runnable = new ListenerPoolRunnable<E>(queue, i);
executorService.execute(runnable);
}
}
public void await() {
try {
executorService.awaitTermination(0, TimeUnit.SECONDS);
} catch (InterruptedException e) {}
}
}

View File

@@ -0,0 +1,24 @@
package base.worker.pooled;
import java.util.concurrent.BlockingQueue;
class ListenerPoolRunnable<E> implements Runnable {
protected BlockingQueue<Wrapper<E>> queue;
protected int id;
public ListenerPoolRunnable(BlockingQueue<Wrapper<E>> queue, int id) {
this.queue = queue;
this.id = id;
}
public void run() {
try {
while (true) {
System.out.println("Thread #" + id + " waiting...");
Wrapper<E> wrapper = queue.take();
wrapper.deliver();
Thread.sleep((int) (Math.random() * 1000));
}
} catch (InterruptedException e) {}
}
}

View File

@@ -0,0 +1,18 @@
package base.worker.pooled;
import java.util.concurrent.BlockingQueue;
public abstract class PooledListener<E> {
protected BlockingQueue<Wrapper<E>> poolQueue;
public void setPoolQueue(BlockingQueue<Wrapper<E>> poolQueue) {
this.poolQueue = poolQueue;
}
public synchronized void add(E element) {
Wrapper<E> wrapper = new Wrapper<E>(this, element);
poolQueue.add(wrapper);
}
abstract void input(E element);
}

View File

@@ -0,0 +1,40 @@
package base.worker.pooled;
import java.util.concurrent.BlockingQueue;
import base.worker.Worker;
public abstract class PooledWorker extends Worker {
protected BlockingQueue<Worker> activateQueue;
protected BlockingQueue<Worker> deactivateQueue;
public void setActivateQueue(BlockingQueue<Worker> activateQueue) {
this.activateQueue = activateQueue;
}
public void setDeactivateQueue(BlockingQueue<Worker> deactivateQueue) {
this.deactivateQueue = deactivateQueue;
}
public void start() {
if (!active) {
activate = true;
}
if (!run) {
run = true;
}
try {
deactivateQueue.remove(this);
activateQueue.put(this);
} catch (InterruptedException e) {}
}
public void stop() {
System.out.println("stop!! " + active);
if (active) {
deactivate = true;
}
activateQueue.remove(this);
deactivateQueue.add(this);
}
}

View File

@@ -0,0 +1,36 @@
package base.worker.pooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class TestPooledListener extends PooledListener<Integer> {
protected int id;
public TestPooledListener(int id) {
this.id = id;
}
public void input(Integer element) {
System.out.println("#" + id + ": " + element);
}
public static void main(String[] args) {
ListenerPool<Integer> listenerPool = new ListenerPool<Integer>(5);
List<PooledListener<Integer>> listenerList = new ArrayList<PooledListener<Integer>>();
for (int i = 0; i < 20; ++i) {
PooledListener<Integer> pooledListener = listenerPool.add(new TestPooledListener(i + 1));
listenerList.add(pooledListener);
}
listenerPool.start();
System.out.println("Starting to give out elements!");
for (int i = 0; i < 100; ++i) {
PooledListener<Integer> randomListener = listenerList.get((new Random()).nextInt(listenerList.size()));
randomListener.add(i);
}
//listenerPool.await();
}
}

View File

@@ -0,0 +1,80 @@
package base.worker.pooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException;
import base.worker.Worker;
public class TestPooledWorker extends PooledWorker {
protected int id;
protected volatile int work;
public TestPooledWorker(int id) {
this.id = id;
}
protected void setWork(int work) {
System.out.println("#" + id + ", set work @ " + work);
this.work = work;
}
public void work() {
System.out.println("#" + id + ", work = " + work);
if (--work < 1) {
stop();
}
sleep(300);
}
protected void activate() throws ActivateException {
System.out.println("#" + id + ", activating...");
super.activate();
}
protected void deactivate() throws DeactivateException {
super.deactivate();
System.out.println("#" + id + ", deactivating...");
}
public static void main(String[] args) {
WorkerPool workerPool = new WorkerPool(3);
List<TestPooledWorker> workerList = new ArrayList<TestPooledWorker>();
for (int i = 0; i < 10; ++i) {
TestPooledWorker worker = new TestPooledWorker(i + 1);
workerPool.add(worker);
workerList.add(worker);
}
workerPool.start();
System.out.println("Starting work!");
int work = 1000;
ArrayList<Worker> activeWorkerList = new ArrayList<Worker>();
for (int i = 0; i < 8; ++i) {
TestPooledWorker worker = workerList.get((new Random()).nextInt(workerList.size()));
worker.setWork(work);
worker.start();
activeWorkerList.add(worker);
}
int i = 0;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
for (Worker worker : activeWorkerList) {
if (++i > 5) {
break;
}
worker.stop();
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {}
System.exit(0);
}
}

View File

@@ -0,0 +1,42 @@
package base.worker.pooled;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import base.util.ArrayCycle;
import base.worker.Worker;
public class WorkerPool {
protected int poolSize;
protected BlockingQueue<Worker> activateQueue;
protected BlockingQueue<Worker> deactivateQueue;
protected ArrayCycle<Worker> workerCycle;
protected ExecutorService executorService;
public WorkerPool(int poolSize) {
this.poolSize = poolSize;
activateQueue = new LinkedBlockingQueue<Worker>();
deactivateQueue = new LinkedBlockingQueue<Worker>();
workerCycle = new ArrayCycle<Worker>();
executorService = Executors.newFixedThreadPool(poolSize);
}
public void start() {
for (int i = 0; i < poolSize; ++i) {
Runnable runnable = new WorkerPoolRunnable(activateQueue, deactivateQueue, workerCycle, i + 1);
executorService.execute(runnable);
}
}
public void stop() {
// Must be graceful
executorService.shutdownNow();
}
public void add(PooledWorker worker) {
worker.setActivateQueue(activateQueue);
worker.setDeactivateQueue(deactivateQueue);
}
}

View File

@@ -0,0 +1,41 @@
package base.worker.pooled;
import java.util.concurrent.BlockingQueue;
import base.util.ArrayCycle;
import base.worker.Worker;
public class WorkerPoolRunnable implements Runnable {
protected BlockingQueue<Worker> activateQueue;
protected BlockingQueue<Worker> deactivateQueue;
protected ArrayCycle<Worker> workerCycle;
protected int id;
public WorkerPoolRunnable(BlockingQueue<Worker> activateQueue, BlockingQueue<Worker> deactivateQueue, ArrayCycle<Worker> workerCycle, int id) {
this.activateQueue = activateQueue;
this.deactivateQueue = deactivateQueue;
this.workerCycle = workerCycle;
this.id = id;
}
public void run() {
while (true) {
if (!deactivateQueue.isEmpty()) {
try {
Worker worker = deactivateQueue.take();
worker.runDeactivate();
workerCycle.remove(worker);
} catch (InterruptedException e) {}
} else if (!activateQueue.isEmpty() || workerCycle.isEmpty()) {
try {
Worker worker = activateQueue.take();
worker.runActivate();
workerCycle.add(worker);
} catch (InterruptedException e) {}
} else {
Worker worker = workerCycle.next();
worker.work();
}
}
}
}

View File

@@ -0,0 +1,16 @@
package base.worker.pooled;
class Wrapper<E> {
protected PooledListener<E> listener;
protected E element;
public Wrapper(PooledListener<E> listener, E element) {
this.listener = listener;
this.element = element;
}
public void deliver() {
listener.input(element);
}
}