From df0200e4c684d6a8f6ed93a1f2e8b2f47eb77f3f Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Sun, 26 Apr 2015 11:35:03 +0100 Subject: [PATCH] Initial implementation of Worker and Listener using Executor framework --- .../main/java/base/worker/AbstractWorker.java | 9 +++ .../java/base/worker/pooled/ListenerPool.java | 38 +++++++++ .../worker/pooled/ListenerPoolRunnable.java | 24 ++++++ .../base/worker/pooled/PooledListener.java | 18 +++++ .../java/base/worker/pooled/PooledWorker.java | 40 ++++++++++ .../worker/pooled/TestPooledListener.java | 36 +++++++++ .../base/worker/pooled/TestPooledWorker.java | 80 +++++++++++++++++++ .../java/base/worker/pooled/WorkerPool.java | 42 ++++++++++ .../worker/pooled/WorkerPoolRunnable.java | 41 ++++++++++ .../main/java/base/worker/pooled/Wrapper.java | 16 ++++ 10 files changed, 344 insertions(+) create mode 100644 java/base/src/main/java/base/worker/AbstractWorker.java create mode 100644 java/base/src/main/java/base/worker/pooled/ListenerPool.java create mode 100644 java/base/src/main/java/base/worker/pooled/ListenerPoolRunnable.java create mode 100644 java/base/src/main/java/base/worker/pooled/PooledListener.java create mode 100644 java/base/src/main/java/base/worker/pooled/PooledWorker.java create mode 100644 java/base/src/main/java/base/worker/pooled/TestPooledListener.java create mode 100644 java/base/src/main/java/base/worker/pooled/TestPooledWorker.java create mode 100644 java/base/src/main/java/base/worker/pooled/WorkerPool.java create mode 100644 java/base/src/main/java/base/worker/pooled/WorkerPoolRunnable.java create mode 100644 java/base/src/main/java/base/worker/pooled/Wrapper.java diff --git a/java/base/src/main/java/base/worker/AbstractWorker.java b/java/base/src/main/java/base/worker/AbstractWorker.java new file mode 100644 index 0000000..0d8f101 --- /dev/null +++ b/java/base/src/main/java/base/worker/AbstractWorker.java @@ -0,0 +1,9 @@ +package base.worker; + +public abstract class AbstractWorker { + public abstract void start(); + + public abstract void stop(); + + protected abstract void work(); +} diff --git a/java/base/src/main/java/base/worker/pooled/ListenerPool.java b/java/base/src/main/java/base/worker/pooled/ListenerPool.java new file mode 100644 index 0000000..998764f --- /dev/null +++ b/java/base/src/main/java/base/worker/pooled/ListenerPool.java @@ -0,0 +1,38 @@ +package base.worker.pooled; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +public class ListenerPool { + protected int poolSize; + protected BlockingQueue> queue; + protected ExecutorService executorService; + + public ListenerPool(int poolSize) { + this.poolSize = poolSize; + queue = new SynchronousQueue>(); + executorService = Executors.newFixedThreadPool(poolSize); + } + + public PooledListener add(PooledListener listener) { + listener.setPoolQueue(queue); + return listener; + } + + public void start() { + for (int i = 0; i < poolSize; ++i) { + Runnable runnable = new ListenerPoolRunnable(queue, i); + executorService.execute(runnable); + } + } + + public void await() { + try { + executorService.awaitTermination(0, TimeUnit.SECONDS); + } catch (InterruptedException e) {} + } + +} diff --git a/java/base/src/main/java/base/worker/pooled/ListenerPoolRunnable.java b/java/base/src/main/java/base/worker/pooled/ListenerPoolRunnable.java new file mode 100644 index 0000000..b4227e3 --- /dev/null +++ b/java/base/src/main/java/base/worker/pooled/ListenerPoolRunnable.java @@ -0,0 +1,24 @@ +package base.worker.pooled; + +import java.util.concurrent.BlockingQueue; + +class ListenerPoolRunnable implements Runnable { + protected BlockingQueue> queue; + protected int id; + + public ListenerPoolRunnable(BlockingQueue> queue, int id) { + this.queue = queue; + this.id = id; + } + + public void run() { + try { + while (true) { + System.out.println("Thread #" + id + " waiting..."); + Wrapper wrapper = queue.take(); + wrapper.deliver(); + Thread.sleep((int) (Math.random() * 1000)); + } + } catch (InterruptedException e) {} + } +} \ No newline at end of file diff --git a/java/base/src/main/java/base/worker/pooled/PooledListener.java b/java/base/src/main/java/base/worker/pooled/PooledListener.java new file mode 100644 index 0000000..db7c379 --- /dev/null +++ b/java/base/src/main/java/base/worker/pooled/PooledListener.java @@ -0,0 +1,18 @@ +package base.worker.pooled; + +import java.util.concurrent.BlockingQueue; + +public abstract class PooledListener { + protected BlockingQueue> poolQueue; + + public void setPoolQueue(BlockingQueue> poolQueue) { + this.poolQueue = poolQueue; + } + + public synchronized void add(E element) { + Wrapper wrapper = new Wrapper(this, element); + poolQueue.add(wrapper); + } + + abstract void input(E element); +} \ No newline at end of file diff --git a/java/base/src/main/java/base/worker/pooled/PooledWorker.java b/java/base/src/main/java/base/worker/pooled/PooledWorker.java new file mode 100644 index 0000000..82a2a50 --- /dev/null +++ b/java/base/src/main/java/base/worker/pooled/PooledWorker.java @@ -0,0 +1,40 @@ +package base.worker.pooled; + +import java.util.concurrent.BlockingQueue; + +import base.worker.Worker; + +public abstract class PooledWorker extends Worker { + protected BlockingQueue activateQueue; + protected BlockingQueue deactivateQueue; + + public void setActivateQueue(BlockingQueue activateQueue) { + this.activateQueue = activateQueue; + } + + public void setDeactivateQueue(BlockingQueue deactivateQueue) { + this.deactivateQueue = deactivateQueue; + } + + public void start() { + if (!active) { + activate = true; + } + if (!run) { + run = true; + } + try { + deactivateQueue.remove(this); + activateQueue.put(this); + } catch (InterruptedException e) {} + } + + public void stop() { + System.out.println("stop!! " + active); + if (active) { + deactivate = true; + } + activateQueue.remove(this); + deactivateQueue.add(this); + } +} diff --git a/java/base/src/main/java/base/worker/pooled/TestPooledListener.java b/java/base/src/main/java/base/worker/pooled/TestPooledListener.java new file mode 100644 index 0000000..d8aa4b7 --- /dev/null +++ b/java/base/src/main/java/base/worker/pooled/TestPooledListener.java @@ -0,0 +1,36 @@ +package base.worker.pooled; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class TestPooledListener extends PooledListener { + protected int id; + + public TestPooledListener(int id) { + this.id = id; + } + + public void input(Integer element) { + System.out.println("#" + id + ": " + element); + } + + public static void main(String[] args) { + ListenerPool listenerPool = new ListenerPool(5); + List> listenerList = new ArrayList>(); + for (int i = 0; i < 20; ++i) { + PooledListener pooledListener = listenerPool.add(new TestPooledListener(i + 1)); + listenerList.add(pooledListener); + } + + listenerPool.start(); + + System.out.println("Starting to give out elements!"); + for (int i = 0; i < 100; ++i) { + PooledListener randomListener = listenerList.get((new Random()).nextInt(listenerList.size())); + randomListener.add(i); + } + + //listenerPool.await(); + } +} diff --git a/java/base/src/main/java/base/worker/pooled/TestPooledWorker.java b/java/base/src/main/java/base/worker/pooled/TestPooledWorker.java new file mode 100644 index 0000000..36079e4 --- /dev/null +++ b/java/base/src/main/java/base/worker/pooled/TestPooledWorker.java @@ -0,0 +1,80 @@ +package base.worker.pooled; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import base.exception.worker.ActivateException; +import base.exception.worker.DeactivateException; +import base.worker.Worker; + +public class TestPooledWorker extends PooledWorker { + protected int id; + protected volatile int work; + + public TestPooledWorker(int id) { + this.id = id; + } + + protected void setWork(int work) { + System.out.println("#" + id + ", set work @ " + work); + this.work = work; + } + + public void work() { + System.out.println("#" + id + ", work = " + work); + if (--work < 1) { + stop(); + } + sleep(300); + } + + protected void activate() throws ActivateException { + System.out.println("#" + id + ", activating..."); + super.activate(); + } + + protected void deactivate() throws DeactivateException { + super.deactivate(); + System.out.println("#" + id + ", deactivating..."); + } + + public static void main(String[] args) { + WorkerPool workerPool = new WorkerPool(3); + + List workerList = new ArrayList(); + for (int i = 0; i < 10; ++i) { + TestPooledWorker worker = new TestPooledWorker(i + 1); + workerPool.add(worker); + workerList.add(worker); + } + workerPool.start(); + + System.out.println("Starting work!"); + int work = 1000; + ArrayList activeWorkerList = new ArrayList(); + for (int i = 0; i < 8; ++i) { + TestPooledWorker worker = workerList.get((new Random()).nextInt(workerList.size())); + worker.setWork(work); + worker.start(); + activeWorkerList.add(worker); + } + int i = 0; + + + try { + Thread.sleep(2000); + } catch (InterruptedException e) {} + + for (Worker worker : activeWorkerList) { + if (++i > 5) { + break; + } + worker.stop(); + } + try { + Thread.sleep(100000); + } catch (InterruptedException e) {} + System.exit(0); + } +} diff --git a/java/base/src/main/java/base/worker/pooled/WorkerPool.java b/java/base/src/main/java/base/worker/pooled/WorkerPool.java new file mode 100644 index 0000000..dd342d7 --- /dev/null +++ b/java/base/src/main/java/base/worker/pooled/WorkerPool.java @@ -0,0 +1,42 @@ +package base.worker.pooled; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import base.util.ArrayCycle; +import base.worker.Worker; + +public class WorkerPool { + protected int poolSize; + protected BlockingQueue activateQueue; + protected BlockingQueue deactivateQueue; + protected ArrayCycle workerCycle; + protected ExecutorService executorService; + + public WorkerPool(int poolSize) { + this.poolSize = poolSize; + activateQueue = new LinkedBlockingQueue(); + deactivateQueue = new LinkedBlockingQueue(); + workerCycle = new ArrayCycle(); + executorService = Executors.newFixedThreadPool(poolSize); + } + + public void start() { + for (int i = 0; i < poolSize; ++i) { + Runnable runnable = new WorkerPoolRunnable(activateQueue, deactivateQueue, workerCycle, i + 1); + executorService.execute(runnable); + } + } + + public void stop() { + // Must be graceful + executorService.shutdownNow(); + } + + public void add(PooledWorker worker) { + worker.setActivateQueue(activateQueue); + worker.setDeactivateQueue(deactivateQueue); + } +} diff --git a/java/base/src/main/java/base/worker/pooled/WorkerPoolRunnable.java b/java/base/src/main/java/base/worker/pooled/WorkerPoolRunnable.java new file mode 100644 index 0000000..d442fc2 --- /dev/null +++ b/java/base/src/main/java/base/worker/pooled/WorkerPoolRunnable.java @@ -0,0 +1,41 @@ +package base.worker.pooled; + +import java.util.concurrent.BlockingQueue; + +import base.util.ArrayCycle; +import base.worker.Worker; + +public class WorkerPoolRunnable implements Runnable { + protected BlockingQueue activateQueue; + protected BlockingQueue deactivateQueue; + protected ArrayCycle workerCycle; + protected int id; + + public WorkerPoolRunnable(BlockingQueue activateQueue, BlockingQueue deactivateQueue, ArrayCycle workerCycle, int id) { + this.activateQueue = activateQueue; + this.deactivateQueue = deactivateQueue; + this.workerCycle = workerCycle; + this.id = id; + } + + public void run() { + while (true) { + if (!deactivateQueue.isEmpty()) { + try { + Worker worker = deactivateQueue.take(); + worker.runDeactivate(); + workerCycle.remove(worker); + } catch (InterruptedException e) {} + } else if (!activateQueue.isEmpty() || workerCycle.isEmpty()) { + try { + Worker worker = activateQueue.take(); + worker.runActivate(); + workerCycle.add(worker); + } catch (InterruptedException e) {} + } else { + Worker worker = workerCycle.next(); + worker.work(); + } + } + } +} diff --git a/java/base/src/main/java/base/worker/pooled/Wrapper.java b/java/base/src/main/java/base/worker/pooled/Wrapper.java new file mode 100644 index 0000000..f447bd6 --- /dev/null +++ b/java/base/src/main/java/base/worker/pooled/Wrapper.java @@ -0,0 +1,16 @@ +package base.worker.pooled; + + +class Wrapper { + protected PooledListener listener; + protected E element; + + public Wrapper(PooledListener listener, E element) { + this.listener = listener; + this.element = element; + } + + public void deliver() { + listener.input(element); + } +} \ No newline at end of file