Refactor some code in worker model

This commit is contained in:
2015-06-15 22:51:03 +01:00
parent eeab850a95
commit fadde43e64
8 changed files with 63 additions and 44 deletions

View File

@@ -6,15 +6,19 @@ import java.lang.invoke.MethodType;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import base.worker.DirectListener; import base.exception.worker.ActivateException;
import base.worker.ThreadListener; import base.worker.ForegroundListener;
import base.worker.BackgroundListener;
import base.worker.Worker; import base.worker.Worker;
import base.worker.pool.Listener; import base.worker.pool.Listener;
import base.worker.pool.ListenerPool; import base.worker.pool.ListenerPool;
import base.worker.pool.PooledListener; import base.worker.pool.PooledListener;
public abstract class Listen<E> extends Work { public abstract class Listen<E> extends Work implements Listener<E> {
protected static final Worker.Type WORKER_TYPE = Worker.Type.DIRECT;
protected Listener<E> listener; protected Listener<E> listener;
protected Worker.Type workerType;
public Queue<E> queue; public Queue<E> queue;
public Listen() { public Listen() {
@@ -22,13 +26,15 @@ public abstract class Listen<E> extends Work {
} }
protected Listen(Worker.Type workerType) { protected Listen(Worker.Type workerType) {
this.workerType = workerType;
switch (workerType) { switch (workerType) {
case DIRECT: case DIRECT:
listener = new DirectListener<E>(this); return;
case FOREGROUND:
listener = new ForegroundListener<E>(this);
break; break;
default: default:
case THREAD: listener = new BackgroundListener<E>(this);
listener = new ThreadListener<E>(this);
break; break;
} }
queue = new ConcurrentLinkedQueue<E>(); queue = new ConcurrentLinkedQueue<E>();
@@ -46,10 +52,27 @@ public abstract class Listen<E> extends Work {
} }
public synchronized void add(E element) { public synchronized void add(E element) {
listener.add(element); if (workerType.equals(Worker.Type.DIRECT)) {
input(element);
} else {
listener.add(element);
}
}
public void start() {
if (workerType.equals(Worker.Type.DIRECT)) {
try {
activate();
} catch (ActivateException e) {
logger.error("Failed to start directly", e);
}
} else {
super.start();
}
} }
public void work() { public void work() {
System.err.println(this.getClass().getName());
while (!queue.isEmpty()) { while (!queue.isEmpty()) {
input(queue.poll()); input(queue.poll());
} }
@@ -61,12 +84,15 @@ public abstract class Listen<E> extends Work {
} }
public void input(Object object) { public void input(Object object) {
// This lookup should be cached
MethodType methodType = MethodType.methodType(void.class, object.getClass()); MethodType methodType = MethodType.methodType(void.class, object.getClass());
MethodHandles.Lookup lookup = MethodHandles.lookup(); MethodHandles.Lookup lookup = MethodHandles.lookup();
MethodHandle methodHandle; MethodHandle methodHandle;
try { try {
methodHandle = lookup.findVirtual(getClass(), "input", methodType); methodHandle = lookup.findVirtual(getClass(), "input", methodType);
methodHandle.invoke(this, object); methodHandle.invoke(this, object);
} catch (Exception e) {
logger.error("", e);
} catch (Throwable e) { } catch (Throwable e) {
logger.error("", e); logger.error("", e);
} }

View File

@@ -12,8 +12,8 @@ import base.worker.pool.PooledWorker;
import base.worker.pool.WorkerPool; import base.worker.pool.WorkerPool;
public abstract class Work { public abstract class Work {
protected static final Worker.Type WORKER_TYPE = Worker.Type.THREAD; protected static final Worker.Type WORKER_TYPE = Worker.Type.BACKGROUND;
protected Logger logger = LoggerFactory.getLogger(getClass()); protected Logger logger = LoggerFactory.getLogger(getClass());
protected Worker worker; protected Worker worker;
@@ -24,11 +24,10 @@ public abstract class Work {
protected Work(Worker.Type workerType) { protected Work(Worker.Type workerType) {
switch (workerType) { switch (workerType) {
case DIRECT: case FOREGROUND:
worker = new DirectWorker(this); worker = new DirectWorker(this);
break; break;
default: default:
case THREAD:
worker = new ThreadWorker(this); worker = new ThreadWorker(this);
break; break;
} }
@@ -48,6 +47,7 @@ public abstract class Work {
} }
public void start() { public void start() {
logger.debug("Start work");
worker.start(); worker.start();
} }
@@ -61,7 +61,7 @@ public abstract class Work {
} }
public void exit() { public void exit() {
worker.exit(); worker.exit();
} }
public void activate() throws ActivateException {} public void activate() throws ActivateException {}

View File

@@ -3,15 +3,15 @@ package base.worker;
import base.work.Listen; import base.work.Listen;
import base.worker.pool.Listener; import base.worker.pool.Listener;
public class ThreadListener<E> extends ThreadWorker implements Listener<E> { public class BackgroundListener<E> extends ThreadWorker implements Listener<E> {
protected Listen<E> listen; protected Listen<E> listen;
public ThreadListener(Listen<E> listen) { public BackgroundListener(Listen<E> listen) {
super(listen); super(listen);
this.listen = listen; this.listen = listen;
} }
public ThreadListener(Listen<E> listen, boolean start) { public BackgroundListener(Listen<E> listen, boolean start) {
super(listen); super(listen);
} }

View File

@@ -1,16 +0,0 @@
package base.worker;
import base.work.Listen;
import base.worker.pool.Listener;
public class DirectListener<E> extends ThreadListener<E> implements Listener<E> {
public DirectListener(Listen<E> listen) {
super(listen, false);
}
@Override
public void add(Object element) {
// TODO Auto-generated method stub
}
}

View File

@@ -0,0 +1,10 @@
package base.worker;
import base.work.Listen;
import base.worker.pool.Listener;
public class ForegroundListener<E> extends BackgroundListener<E> implements Listener<E> {
public ForegroundListener(Listen<E> listen) {
super(listen, false);
}
}

View File

@@ -13,11 +13,11 @@ public abstract class IntervalWork extends Work {
protected IntervalWork(Worker.Type workerType) { protected IntervalWork(Worker.Type workerType) {
switch (workerType) { switch (workerType) {
case DIRECT: case FOREGROUND:
worker = new DirectIntervalWorker(this); worker = new DirectIntervalWorker(this);
break; break;
default: default:
case THREAD: case BACKGROUND:
worker = new ThreadIntervalWorker(this); worker = new ThreadIntervalWorker(this);
break; break;
} }
@@ -25,11 +25,11 @@ public abstract class IntervalWork extends Work {
protected IntervalWork(Worker.Type workerType, int interval) { protected IntervalWork(Worker.Type workerType, int interval) {
switch (workerType) { switch (workerType) {
case DIRECT: case FOREGROUND:
worker = new DirectIntervalWorker(this, interval); worker = new DirectIntervalWorker(this, interval);
break; break;
default: default:
case THREAD: case BACKGROUND:
worker = new ThreadIntervalWorker(this, interval); worker = new ThreadIntervalWorker(this, interval);
break; break;
} }

View File

@@ -40,9 +40,6 @@ public class ThreadWorker extends Worker implements Runnable {
public synchronized void stop() { public synchronized void stop() {
super.stop(); super.stop();
if (active) {
deactivate = true;
}
notifyAll(); notifyAll();
} }

View File

@@ -9,7 +9,7 @@ import base.work.Work;
public abstract class Worker { public abstract class Worker {
public enum Type { public enum Type {
DIRECT, THREAD, POOLED DIRECT, FOREGROUND, BACKGROUND, POOLED
} }
public static final int SLEEP = 100; public static final int SLEEP = 100;
@@ -28,10 +28,10 @@ public abstract class Worker {
} }
public boolean active() { public boolean active() {
return active; return deactivate || active;
} }
public void run() { public final void run() {
while (run || deactivate) { while (run || deactivate) {
runActivate(); runActivate();
runDeactivate(); runDeactivate();
@@ -51,7 +51,7 @@ public abstract class Worker {
} }
} }
} }
public void runDeactivate() { public void runDeactivate() {
if (deactivate && active) { if (deactivate && active) {
try { try {
@@ -96,8 +96,10 @@ public abstract class Worker {
public abstract void start(); public abstract void start();
public void stop() { public void stop() {
logger.debug("Stop worker"); if (active && !activate) {
activate = false; deactivate = true;
}
activate = false;
} }
abstract public void exit(); abstract public void exit();