Snapshot of base part

This commit is contained in:
2016-06-20 11:11:57 +01:00
parent 84f3da2941
commit 1317047268
6 changed files with 365 additions and 377 deletions

View File

@@ -1,108 +1,96 @@
package base.work; package base.work;
import java.lang.invoke.MethodHandle; import java.util.Queue;
import java.lang.invoke.MethodHandles; import java.util.concurrent.ConcurrentLinkedQueue;
import java.lang.invoke.MethodType;
import java.util.Queue; import base.exception.worker.ActivateException;
import java.util.concurrent.ConcurrentLinkedQueue; import base.worker.BackgroundListener;
import base.worker.ForegroundListener;
import base.exception.worker.ActivateException; import base.worker.Worker;
import base.worker.ForegroundListener; import base.worker.pool.Listener;
import base.worker.BackgroundListener; import base.worker.pool.ListenerPool;
import base.worker.Worker; import base.worker.pool.PooledListener;
import base.worker.pool.Listener;
import base.worker.pool.ListenerPool; public abstract class Listen<E> extends Work implements Listener<E> {
import base.worker.pool.PooledListener; protected static final Worker.Type WORKER_TYPE = Worker.Type.DIRECT;
public abstract class Listen<E> extends Work implements Listener<E> { protected Listener<E> listener;
protected static final Worker.Type WORKER_TYPE = Worker.Type.DIRECT; protected Worker.Type workerType;
public Queue<E> queue;
protected Listener<E> listener;
protected Worker.Type workerType; public Listen() {
public Queue<E> queue; this(WORKER_TYPE);
}
public Listen() {
this(WORKER_TYPE); protected Listen(Worker.Type workerType) {
} queue = new ConcurrentLinkedQueue<E>();
this.workerType = workerType;
protected Listen(Worker.Type workerType) { switch (workerType) {
this.workerType = workerType; case DIRECT:
switch (workerType) { return;
case DIRECT: case FOREGROUND:
return; listener = new ForegroundListener<E>(this);
case FOREGROUND: break;
listener = new ForegroundListener<E>(this); default:
break; listener = new BackgroundListener<E>(this);
default: break;
listener = new BackgroundListener<E>(this); }
break; }
}
queue = new ConcurrentLinkedQueue<E>(); protected Listen(Worker worker) {
} this.worker = worker;
queue = new ConcurrentLinkedQueue<E>();
protected Listen(Worker worker) { }
this.worker = worker;
queue = new ConcurrentLinkedQueue<E>(); protected Listen(ListenerPool<E> listenerPool) {
} listener = new PooledListener<E>(this);
listenerPool.add((PooledListener<E>) listener);
protected Listen(ListenerPool<E> listenerPool) { queue = new ConcurrentLinkedQueue<E>();
listener = new PooledListener<E>(this); }
listenerPool.add((PooledListener<E>) listener);
queue = new ConcurrentLinkedQueue<E>(); public synchronized void add(E element) {
} if (workerType.equals(Worker.Type.DIRECT)) {
input(element);
public synchronized void add(E element) { } else {
if (workerType.equals(Worker.Type.DIRECT)) { queue.add(element);
input(element); listener.add(element);
} else { }
listener.add(element); }
}
} public void start() {
if (workerType.equals(Worker.Type.DIRECT)) {
public void start() { try {
if (workerType.equals(Worker.Type.DIRECT)) { activate();
try { } catch (ActivateException e) {
activate(); logger.error("Failed to start directly", e);
} catch (ActivateException e) { }
logger.error("Failed to start directly", e); } else {
} super.start();
} else { }
super.start(); }
}
} public void stop() {
super.stop();
public void stop() { synchronized (this) {
super.stop(); notifyAll();
synchronized (this) { }
notifyAll(); }
}
} public void work() {
while (!queue.isEmpty()) {
public void work() { logger.debug("Listen: work() > input");
while (!queue.isEmpty()) { input(queue.poll());
logger.debug("Listen: work() > input"); }
input(queue.poll()); synchronized (this) {
} logger.debug("Listen: work() > wait");
synchronized (this) { try {
logger.debug("Listen: work() > wait"); wait();
try { } catch (InterruptedException e) {}
wait(); logger.debug("Listen: work() > notified");
} catch (InterruptedException e) {} }
logger.debug("Listen: work() > notified"); }
}
} public void input(E element) {
System.err.println(element);
public void input(Object object) { }
MethodType methodType = MethodType.methodType(void.class, object.getClass()); }
MethodHandles.Lookup lookup = MethodHandles.lookup();
MethodHandle methodHandle;
try {
methodHandle = lookup.findVirtual(getClass(), "input", methodType);
methodHandle.invoke(this, object);
} catch (Exception e) {
logger.error("", e);
} catch (Throwable e) {
logger.error("", e);
}
}
}

View File

@@ -1,73 +1,73 @@
package base.work; package base.work;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import base.Control; import base.Control;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.worker.DirectWorker; import base.worker.DirectWorker;
import base.worker.ThreadWorker; import base.worker.ThreadWorker;
import base.worker.Worker; import base.worker.Worker;
import base.worker.pool.PooledWorker; import base.worker.pool.PooledWorker;
import base.worker.pool.WorkerPool; import base.worker.pool.WorkerPool;
public abstract class Work implements Control { public abstract class Work implements Control {
protected static final Worker.Type WORKER_TYPE = Worker.Type.BACKGROUND; protected static final Worker.Type WORKER_TYPE = Worker.Type.BACKGROUND;
protected Logger logger = LoggerFactory.getLogger(getClass()); protected Logger logger = LoggerFactory.getLogger(getClass());
protected Worker worker; protected Worker worker;
protected Work() { protected Work() {
this(WORKER_TYPE); this(WORKER_TYPE);
} }
protected Work(Worker.Type workerType) { protected Work(Worker.Type workerType) {
switch (workerType) { switch (workerType) {
case FOREGROUND: case FOREGROUND:
worker = new DirectWorker(this); worker = new DirectWorker(this);
break; break;
default: default:
worker = new ThreadWorker(this); worker = new ThreadWorker(this);
break; break;
} }
} }
protected Work(Worker worker) { protected Work(Worker worker) {
this.worker = worker; this.worker = worker;
} }
protected Work(WorkerPool workerPool) { protected Work(WorkerPool workerPool) {
worker = new PooledWorker(this); worker = new PooledWorker(this);
workerPool.add((PooledWorker) worker); workerPool.add((PooledWorker) worker);
} }
protected void sleep(int time) { protected void sleep(int time) {
worker.sleep(time); worker.sleep(time);
} }
public void start() { public void start() {
logger.debug("Work: start()"); logger.trace("Work: start()");
worker.start(); worker.start();
} }
public void stop() { public void stop() {
logger.debug("Work: stop()"); logger.trace("Work: stop()");
worker.stop(); worker.stop();
} }
public boolean active() { public boolean active() {
logger.debug("Work: active()"); logger.trace("Work: active()");
return worker.active(); return worker.active();
} }
public void exit() { public void exit() {
logger.debug("Work: exit()"); logger.debug("Work: exit()");
worker.exit(); worker.exit();
} }
public void activate() throws ActivateException {} public void activate() throws ActivateException {}
public void deactivate() throws DeactivateException {} public void deactivate() throws DeactivateException {}
public abstract void work(); public abstract void work();
} }

View File

@@ -1,22 +1,21 @@
package base.worker; package base.worker;
import base.work.Listen; import base.work.Listen;
import base.worker.pool.Listener; import base.worker.pool.Listener;
public class BackgroundListener<E> extends ThreadWorker implements Listener<E> { public class BackgroundListener<E> extends ThreadWorker implements Listener<E> {
protected Listen<E> listen; protected Listen<E> listen;
public BackgroundListener(Listen<E> listen) { public BackgroundListener(Listen<E> listen) {
super(listen); this(listen, true);
this.listen = listen; }
}
public BackgroundListener(Listen<E> listen, boolean thread) {
public BackgroundListener(Listen<E> listen, boolean start) { super(listen, thread);
super(listen); this.listen = listen;
} }
public void add(E element) { public void add(E element) {
listen.queue.add(element); listen.notify();
listen.notify(); }
} }
}

View File

@@ -1,45 +1,45 @@
package base.worker; package base.worker;
import base.work.Work; import base.work.Work;
public class ThreadWorker extends Worker implements Runnable { public class ThreadWorker extends Worker implements Runnable {
protected static final boolean THREAD = true; protected static final boolean THREAD = true;
protected boolean thread = true; protected boolean thread = true;
public ThreadWorker(Work work, boolean thread) { public ThreadWorker(Work work, boolean thread) {
this(work); this(work);
this.thread = thread; this.thread = thread;
} }
public ThreadWorker(Work work) { public ThreadWorker(Work work) {
super(work); super(work);
} }
public synchronized void start(boolean thread) { public synchronized void start(boolean thread) {
if (!active) { if (!active) {
activate = true; activate = true;
} }
if (!run) { if (!run) {
run = true; run = true;
if (thread) { if (thread) {
logger.debug("Start thread"); logger.debug("Start thread");
new Thread(this, work.getClass().getName()).start(); new Thread(this, work.getClass().getName()).start();
} else { } else {
logger.debug("Run directly"); logger.debug("Run directly");
run(); run();
} }
} else { } else {
notifyAll(); notifyAll();
} }
} }
public synchronized void start() { public synchronized void start() {
start(thread); start(thread);
} }
public void exit() { public void exit() {
work.stop(); run = false;
run = false; work.stop();
} }
} }

View File

@@ -1,114 +1,114 @@
package base.worker; package base.worker;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import base.exception.worker.ActivateException; import base.exception.worker.ActivateException;
import base.exception.worker.DeactivateException; import base.exception.worker.DeactivateException;
import base.work.Work; import base.work.Work;
public abstract class Worker { public abstract class Worker {
public enum Type { public enum Type {
DIRECT, FOREGROUND, BACKGROUND, POOLED DIRECT, FOREGROUND, BACKGROUND, POOLED
} }
public static final int SLEEP = 100; public static final int SLEEP = 100;
protected Logger logger; protected Logger logger;
protected boolean run = false; protected boolean run = false;
protected boolean active = false; protected boolean active = false;
protected boolean activate = false; protected boolean activate = false;
protected boolean deactivate = false; protected boolean deactivate = false;
protected Work work; protected Work work;
public Worker(Work work) { public Worker(Work work) {
this.work = work; this.work = work;
logger = LoggerFactory.getLogger(work.getClass()); logger = LoggerFactory.getLogger(work.getClass());
} }
public boolean active() { public boolean active() {
logger.debug("Worker: active()"); logger.trace("Worker: active()");
return deactivate || active; return deactivate || active;
} }
public final void run() { public final void run() {
logger.debug("Worker: run()"); logger.debug("Worker: run()");
while (run || deactivate) { while (run || deactivate) {
runActivate(); runActivate();
runDeactivate(); runDeactivate();
runWork(); runWork();
} }
} }
public void runActivate() { public void runActivate() {
if (activate && !active) { if (activate && !active) {
logger.debug("Worker: runActivate()"); logger.trace("Worker: runActivate()");
try { try {
work.activate(); work.activate();
active = true; active = true;
} catch (ActivateException e) { } catch (ActivateException e) {
logger.error("", e); logger.error("", e);
} finally { } finally {
activate = false; activate = false;
} }
} }
} }
public void runDeactivate() { public void runDeactivate() {
if (deactivate && active) { if (deactivate && active) {
logger.debug("Worker: runDeactivate()"); logger.trace("Worker: runDeactivate()");
try { try {
work.deactivate(); work.deactivate();
} catch (DeactivateException e) { } catch (DeactivateException e) {
logger.error("", e); logger.error("", e);
} finally { } finally {
deactivate = false; deactivate = false;
active = false; active = false;
} }
} }
} }
public void runWork() { public void runWork() {
if (active) { if (active) {
logger.debug("Worker: runWork() > work"); logger.trace("Worker: runWork() > work");
work.work(); work.work();
} else if (run) { } else if (run) {
try { try {
logger.debug("Worker: runWork() > wait"); logger.trace("Worker: runWork() > wait");
synchronized (this) { synchronized (this) {
wait(); wait();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.info("", e); logger.info("", e);
} }
} }
} }
public void sleep() { public void sleep() {
sleep(SLEEP); sleep(SLEEP);
} }
public void sleep(int time) { public void sleep(int time) {
try { try {
if (time > 0) { if (time > 0) {
Thread.sleep(time); Thread.sleep(time);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.info("", e); logger.info("", e);
} }
} }
public abstract void start(); public abstract void start();
public void stop() { public void stop() {
logger.debug("Worker: stop()"); logger.trace("Worker: stop()");
if (active && !activate) { if (active && !activate) {
deactivate = true; deactivate = true;
} }
activate = false; activate = false;
} }
abstract public void exit(); abstract public void exit();
} }

View File

@@ -1,16 +1,17 @@
package junit; package junit;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Suite; import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses; import org.junit.runners.Suite.SuiteClasses;
@RunWith(Suite.class) @RunWith(Suite.class)
@SuiteClasses({ @SuiteClasses({
TestTcpSocketCommunication.class, TestTcpSocketCommunication.class,
TestTcpChannelCommunication.class, TestTcpChannelCommunication.class,
TestUdpUnicastCommunication.class, TestUdpUnicastCommunication.class,
TestUdpMulticastCommunication.class TestUdpMulticastCommunication.class,
}) TestUdpDuplexCommunication.class
})
public class AllTests {}
public class AllTests {}
// Should test start()/stop() of components, check implementation // Should test start()/stop() of components, check implementation