This commit is contained in:
Jonathan Leibiusky
2010-09-14 14:11:57 -03:00
2 changed files with 363 additions and 339 deletions

View File

@@ -9,14 +9,13 @@ import java.util.concurrent.TimeoutException;
/** /**
* Abstract resource pool of type T. * Abstract resource pool of type T.
* * <p/>
* Needs implementation for creation, validation and destruction of the * Needs implementation for creation, validation and destruction of the
* resources. * resources.
* * <p/>
* Keeps a fixed amount of resources * Keeps a fixed amount of resources
* *
* @author Luis Dario Simonassi * @author Luis Dario Simonassi
*
* @param <T> * @param <T>
* The type of the resource to be managed. * The type of the resource to be managed.
*/ */
@@ -48,6 +47,16 @@ public abstract class FixedResourcePool<T> {
} }
} }
public abstract static class Printer {
public abstract void print(String str);
}
public static class DefaultPrinter extends Printer {
public void print(String str) {
System.out.println(str);
}
}
/** /**
* Generic Repair Thread * Generic Repair Thread
*/ */
@@ -105,7 +114,7 @@ public abstract class FixedResourcePool<T> {
} }
} }
System.out.println("Ending thread [" println("Ending thread ["
+ Thread.currentThread().getName() + "]"); + Thread.currentThread().getName() + "]");
} }
@@ -166,6 +175,8 @@ public abstract class FixedResourcePool<T> {
private volatile long resourcesProvided = 0; private volatile long resourcesProvided = 0;
private volatile long resourcesReturned = 0; private volatile long resourcesReturned = 0;
private Printer printer = new DefaultPrinter();
/* /*
* Pool metrics accessing methods. * Pool metrics accessing methods.
*/ */
@@ -263,18 +274,27 @@ public abstract class FixedResourcePool<T> {
return defaultPoolWait; return defaultPoolWait;
} }
public void setPrinter(Printer printer) {
this.printer = printer;
}
private void println(String str) {
if(printer != null)
printer.print(str);
}
/** /**
* Pool initialization & destruction * Pool initialization & destruction
*/ */
public void destroy() { public void destroy() {
checkInit(); checkInit();
System.out.println("Destroying [" + getName() + "]..."); println("Destroying [" + getName() + "]...");
// Signal al threads to end // Signal al threads to end
finishing = true; finishing = true;
System.out.println("Destroying [" + getName() + "] threads"); println("Destroying [" + getName() + "] threads");
// Wait for the Repair Threas // Wait for the Repair Threas
for (int i = 0; i < repairThreads.length; i++) { for (int i = 0; i < repairThreads.length; i++) {
boolean joined = false; boolean joined = false;
@@ -289,7 +309,7 @@ public abstract class FixedResourcePool<T> {
} while (!joined); } while (!joined);
} }
System.out.println("Waiting for [" + getName() println("Waiting for [" + getName()
+ "] resources to be returned."); + "] resources to be returned.");
// Wait for all resources to be returned to the pool // Wait for all resources to be returned to the pool
synchronized (this) { synchronized (this) {
@@ -302,7 +322,9 @@ public abstract class FixedResourcePool<T> {
} }
} }
System.out.println("Destroying [" + getName() + "] resources."); printStatistics();
println("Destroying [" + getName() + "] resources.");
// Destroy resources // Destroy resources
for (Wrapper<T> resource : availableQueue) { for (Wrapper<T> resource : availableQueue) {
destroyResource(resource.wrapped); destroyResource(resource.wrapped);
@@ -319,7 +341,7 @@ public abstract class FixedResourcePool<T> {
repairQueue = null; repairQueue = null;
// Destroy metrics timer // Destroy metrics timer
System.out.println("Shuting metrics timer for [" + getName() println("Shuting metrics timer for [" + getName()
+ "] down."); + "] down.");
t.cancel(); t.cancel();
t = null; t = null;
@@ -335,7 +357,7 @@ public abstract class FixedResourcePool<T> {
initializated = false; initializated = false;
finishing = false; finishing = false;
System.out.println("Pool [" + getName() + "] successfully destroyed."); println("Pool [" + getName() + "] successfully destroyed.");
} }
/** /**
@@ -344,7 +366,7 @@ public abstract class FixedResourcePool<T> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void init() { public void init() {
if (initializated == true) { if (initializated == true) {
System.err.println("Warning, double initialization of [" + this println("Warning, double initialization of [" + this
+ "]"); + "]");
return; return;
} }
@@ -375,28 +397,25 @@ public abstract class FixedResourcePool<T> {
t.schedule(new TimerTask() { t.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
System.out.println("**********************************"); printStatistics();
System.out.println("* Pool name:[" + name + "]");
System.out.println("* resourcesCreated....:"
+ getResourcesCreated());
System.out.println("* failsReported.......:"
+ getFailsReported());
System.out.println("* fails...............:" + getFails());
System.out.println("* resourcesCreated....:"
+ getResourcesCreated());
System.out.println("* resourcesProvided...:"
+ getResourcesProvided());
System.out.println("* resourcesReturned...:"
+ getResourcesReturned());
System.out.println("* available size......:"
+ availableQueue.size());
System.out.println("* repair size.........:"
+ repairQueue.size());
System.out.println("**********************************");
} }
}, 10000, 10000); }, 10000, 10000);
System.out.println("Initialized [" + name + "]"); println("Initialized [" + name + "]");
}
private void printStatistics() {
println("**********************************" +
"\n* Pool name:[" + name + "]" +
"\n* resourcesCreated....:" + getResourcesCreated() +
"\n* failsReported.......:" + getFailsReported() +
"\n* fails...............:" + getFails() +
"\n* resourcesCreated....:" + getResourcesCreated() +
"\n* resourcesProvided...:" + getResourcesProvided() +
"\n* resourcesReturned...:" + getResourcesReturned() +
"\n* available size......:" + availableQueue.size() +
"\n* repair size.........:" + repairQueue.size() +
"\n**********************************");
} }
protected void checkInit() { protected void checkInit() {
@@ -507,8 +526,7 @@ public abstract class FixedResourcePool<T> {
/** /**
* Get a resource from the pool. * Get a resource from the pool.
* *
* @param maxTime * @param maxTime Max time you would like to wait for the resource
* Max time you would like to wait for the resource
* @return * @return
* @throws TimeoutException * @throws TimeoutException
*/ */

View File

@@ -5,6 +5,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPool;
@@ -24,7 +25,7 @@ public class PoolBenchmark {
// withoutPool(); // withoutPool();
withPool(); withPool();
long elapsed = System.currentTimeMillis() - t; long elapsed = System.currentTimeMillis() - t;
System.out.println(((1000 * 3 * TOTAL_OPERATIONS) / elapsed) + " ops"); System.out.println(((1000 * 2 * TOTAL_OPERATIONS) / elapsed) + " ops");
} }
private static void withoutPool() throws InterruptedException { private static void withoutPool() throws InterruptedException {
@@ -60,19 +61,19 @@ public class PoolBenchmark {
private static void withPool() throws InterruptedException { private static void withPool() throws InterruptedException {
final JedisPool pool = new JedisPool("localhost"); final JedisPool pool = new JedisPool("localhost");
pool.setResourcesNumber(1000); pool.setResourcesNumber(50);
pool.setDefaultPoolWait(20); pool.setDefaultPoolWait(1000000);
pool.init(); pool.init();
List<Thread> tds = new ArrayList<Thread>(); List<Thread> tds = new ArrayList<Thread>();
for (int i = 0; i < TOTAL_OPERATIONS; i++) { final AtomicInteger ind = new AtomicInteger();
final String key = "foo" + i; for (int i = 0; i < 50; i++) {
Thread hj = new Thread(new Runnable() { Thread hj = new Thread(new Runnable() {
@Override
public void run() { public void run() {
for(int i = 0; (i = ind.getAndIncrement()) < TOTAL_OPERATIONS; ) {
try { try {
Jedis j = pool.getResource(); Jedis j = pool.getResource();
j.auth("foobared"); final String key = "foo" + i;
j.set(key, key); j.set(key, key);
j.get(key); j.get(key);
pool.returnResource(j); pool.returnResource(j);
@@ -80,10 +81,15 @@ public class PoolBenchmark {
e.printStackTrace(); e.printStackTrace();
} }
} }
}
}); });
tds.add(hj); tds.add(hj);
hj.start(); hj.start();
} }
for(Thread t : tds)
t.join();
pool.destroy(); pool.destroy();
} }
} }