diff --git a/src/main/java/redis/clients/util/FixedResourcePool.java b/src/main/java/redis/clients/util/FixedResourcePool.java index b902fbd..703dcbc 100644 --- a/src/main/java/redis/clients/util/FixedResourcePool.java +++ b/src/main/java/redis/clients/util/FixedResourcePool.java @@ -9,16 +9,15 @@ import java.util.concurrent.TimeoutException; /** * Abstract resource pool of type T. - * + *

* Needs implementation for creation, validation and destruction of the * resources. - * + *

* Keeps a fixed amount of resources - * + * * @author Luis Dario Simonassi - * * @param - * The type of the resource to be managed. + * The type of the resource to be managed. */ public abstract class FixedResourcePool { @@ -31,130 +30,140 @@ public abstract class FixedResourcePool { * Generic Resource Wrapper */ private static class Wrapper { - long timestamp; - T wrapped; + long timestamp; + T wrapped; - public Wrapper(T wrapped) { - this.wrapped = wrapped; - mark(); - } + public Wrapper(T wrapped) { + this.wrapped = wrapped; + mark(); + } - public void mark() { - timestamp = System.currentTimeMillis(); - } + public void mark() { + timestamp = System.currentTimeMillis(); + } - public long getLastMark() { - return timestamp; - } + public long getLastMark() { + return timestamp; + } + } + + public abstract static class Printer { + public abstract void print(String str); + } + + public static class DefaultPrinter extends Printer { + public void print(String str) { + System.out.println(str); + } } /** * Generic Repair Thread */ protected class RepairThread extends Thread { - public void run() { + public void run() { - // Contribute to the repairing and validation effort until the pool - // is destroyed (finishig=true) - while (!finishing) { - Wrapper wrapper; - try { - // Remove the oldest element from the repair queue. - wrapper = repairQueue.poll(timeBetweenValidation, - TimeUnit.MILLISECONDS); - if (wrapper == null) { - // If I've been waiting too much, i'll check the idle - // pool if connections need - // validation and move them to the repair queue - checkIdles(); - continue; - } - } catch (InterruptedException e) { - continue; - } + // Contribute to the repairing and validation effort until the pool + // is destroyed (finishig=true) + while (!finishing) { + Wrapper wrapper; + try { + // Remove the oldest element from the repair queue. + wrapper = repairQueue.poll(timeBetweenValidation, + TimeUnit.MILLISECONDS); + if (wrapper == null) { + // If I've been waiting too much, i'll check the idle + // pool if connections need + // validation and move them to the repair queue + checkIdles(); + continue; + } + } catch (InterruptedException e) { + continue; + } - // Now, I have something to repair! - T resource = wrapper.wrapped; - boolean valid = false; + // Now, I have something to repair! + T resource = wrapper.wrapped; + boolean valid = false; - // Resources are null right after initialization, it means the - // same as being an invalid resource - if (resource != null) { - valid = isResourceValid(resource); // Validate the resource. - if (!valid) - fails++; - } + // Resources are null right after initialization, it means the + // same as being an invalid resource + if (resource != null) { + valid = isResourceValid(resource); // Validate the resource. + if (!valid) + fails++; + } - // If resource is invalid or null, create a new resource and - // destroy the invalid one. - if (!valid) { - T replace = createResource(); - resourcesCreated++; - wrapper.wrapped = replace; - if (resource != null) - destroyResource(resource); - } + // If resource is invalid or null, create a new resource and + // destroy the invalid one. + if (!valid) { + T replace = createResource(); + resourcesCreated++; + wrapper.wrapped = replace; + if (resource != null) + destroyResource(resource); + } - // Mark the resource as fresh! - wrapper.mark(); + // Mark the resource as fresh! + wrapper.mark(); - // Offer the resource to the available resources pool. - if (!availableQueue.offer(wrapper)) { - System.err - .println("This shouldn't happen, offering to available was rejected."); - } - } + // Offer the resource to the available resources pool. + if (!availableQueue.offer(wrapper)) { + System.err + .println("This shouldn't happen, offering to available was rejected."); + } + } - System.out.println("Ending thread [" - + Thread.currentThread().getName() + "]"); - } + println("Ending thread [" + + Thread.currentThread().getName() + "]"); + } - /** - * Check if resources in the idle queue needs to be repaired - */ - private void checkIdles() { - // Get a sample without removing it - Wrapper wrapper = availableQueue.peek(); + /** + * Check if resources in the idle queue needs to be repaired + */ + private void checkIdles() { + // Get a sample without removing it + Wrapper wrapper = availableQueue.peek(); - // If no available items, nothing to repair. - if (wrapper == null) - return; + // If no available items, nothing to repair. + if (wrapper == null) + return; - // Check if the sampled resource needs to be repaired - boolean repairNeeded = isValidationNeeded(wrapper); - if (!repairNeeded) - return; + // Check if the sampled resource needs to be repaired + boolean repairNeeded = isValidationNeeded(wrapper); + if (!repairNeeded) + return; - // Move available resources from the available queue to the repair - // queue until no repair is needed. - while (repairNeeded) { + // Move available resources from the available queue to the repair + // queue until no repair is needed. + while (repairNeeded) { - // Get the connection from the available queue and check again. - wrapper = availableQueue.poll(); + // Get the connection from the available queue and check again. + wrapper = availableQueue.poll(); - // No resources in the available queue, nothing to do - if (wrapper == null) { - repairNeeded = false; - return; - } + // No resources in the available queue, nothing to do + if (wrapper == null) { + repairNeeded = false; + return; + } - // Add the resource to the corresponding queue, depending on - // weather the resource needs to be repaired or not. - repairNeeded = isValidationNeeded(wrapper); + // Add the resource to the corresponding queue, depending on + // weather the resource needs to be repaired or not. + repairNeeded = isValidationNeeded(wrapper); - if (repairNeeded) { - if (!repairQueue.offer(wrapper)) { - System.err - .print("FATAL: This shouldn't happen, offering to repairing was rejected."); - } - } else { - if (!availableQueue.offer(wrapper)) { - System.err - .print("FATAL: This shouldn't happen, offering to available was rejected."); - } - } - } - } + if (repairNeeded) { + if (!repairQueue.offer(wrapper)) { + System.err + .print("FATAL: This shouldn't happen, offering to repairing was rejected."); + } + } else { + if (!availableQueue.offer(wrapper)) { + System.err + .print("FATAL: This shouldn't happen, offering to available was rejected."); + } + } + } + } } /* @@ -166,28 +175,30 @@ public abstract class FixedResourcePool { private volatile long resourcesProvided = 0; private volatile long resourcesReturned = 0; + private Printer printer = new DefaultPrinter(); + /* * Pool metrics accessing methods. */ public long getFailsReported() { - return failsReported; + return failsReported; } public long getFails() { - return fails; + return fails; } public long getResourcesCreated() { - return resourcesCreated; + return resourcesCreated; } public long getResourcesProvided() { - return resourcesProvided; + return resourcesProvided; } public long getResourcesReturned() { - return resourcesReturned; + return resourcesReturned; } /* @@ -215,127 +226,138 @@ public abstract class FixedResourcePool { */ public int getResourcesNumber() { - return resourcesNumber; + return resourcesNumber; } public void setResourcesNumber(int resourcesNumber) { - this.resourcesNumber = resourcesNumber; + this.resourcesNumber = resourcesNumber; } public int getRepairThreadsNumber() { - return repairThreadsNumber; + return repairThreadsNumber; } public void setRepairThreadsNumber(int repairThreadsNumber) { - if (initializated) - throw new IllegalStateException( - "Repair threads should be setted up before init()"); - this.repairThreadsNumber = repairThreadsNumber; + if (initializated) + throw new IllegalStateException( + "Repair threads should be setted up before init()"); + this.repairThreadsNumber = repairThreadsNumber; } public long getTimeBetweenValidation() { - return timeBetweenValidation; + return timeBetweenValidation; } public void setTimeBetweenValidation(long timeBetweenValidation) { - this.timeBetweenValidation = timeBetweenValidation; + this.timeBetweenValidation = timeBetweenValidation; } public void setName(String name) { - if (initializated) - throw new IllegalStateException( - "Name should be setted up before init()"); - this.name = name; + if (initializated) + throw new IllegalStateException( + "Name should be setted up before init()"); + this.name = name; } public String getName() { - if (name == null || name.isEmpty()) { - name = this.getClass().getName(); - } - return name; + if (name == null || name.isEmpty()) { + name = this.getClass().getName(); + } + return name; } public void setDefaultPoolWait(long defaultPoolWait) { - this.defaultPoolWait = defaultPoolWait; + this.defaultPoolWait = defaultPoolWait; } public long getDefaultPoolWait() { - return defaultPoolWait; + return defaultPoolWait; + } + + public void setPrinter(Printer printer) { + this.printer = printer; + } + + private void println(String str) { + if(printer != null) + printer.print(str); } /** * Pool initialization & destruction */ public void destroy() { - checkInit(); + checkInit(); - System.out.println("Destroying [" + getName() + "]..."); + println("Destroying [" + getName() + "]..."); - // Signal al threads to end - finishing = true; + // Signal al threads to end + finishing = true; - System.out.println("Destroying [" + getName() + "] threads"); - // Wait for the Repair Threas - for (int i = 0; i < repairThreads.length; i++) { - boolean joined = false; - do { - try { - repairThreads[i].interrupt(); - repairThreads[i].join(); - joined = true; - } catch (InterruptedException e) { - e.printStackTrace(); - } - } while (!joined); - } + println("Destroying [" + getName() + "] threads"); + // Wait for the Repair Threas + for (int i = 0; i < repairThreads.length; i++) { + boolean joined = false; + do { + try { + repairThreads[i].interrupt(); + repairThreads[i].join(); + joined = true; + } catch (InterruptedException e) { + e.printStackTrace(); + } + } while (!joined); + } - System.out.println("Waiting for [" + getName() - + "] resources to be returned."); - // Wait for all resources to be returned to the pool - synchronized (this) { - while (!inUse.isEmpty()) { - try { - this.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } + println("Waiting for [" + getName() + + "] resources to be returned."); + // Wait for all resources to be returned to the pool + synchronized (this) { + while (!inUse.isEmpty()) { + try { + this.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } - System.out.println("Destroying [" + getName() + "] resources."); - // Destroy resources - for (Wrapper resource : availableQueue) { - destroyResource(resource.wrapped); - } + printStatistics(); - availableQueue.clear(); - availableQueue = null; + println("Destroying [" + getName() + "] resources."); + // Destroy resources + for (Wrapper resource : availableQueue) { + destroyResource(resource.wrapped); + } - for (Wrapper resource : repairQueue) { - destroyResource(resource.wrapped); - } + availableQueue.clear(); + availableQueue = null; - repairQueue.clear(); - repairQueue = null; + for (Wrapper resource : repairQueue) { + destroyResource(resource.wrapped); + } - // Destroy metrics timer - System.out.println("Shuting metrics timer for [" + getName() - + "] down."); - t.cancel(); - t = null; + repairQueue.clear(); + repairQueue = null; - // Reset metrics - failsReported = 0; - fails = 0; - resourcesCreated = 0; - resourcesProvided = 0; - resourcesReturned = 0; + // Destroy metrics timer + println("Shuting metrics timer for [" + getName() + + "] down."); + t.cancel(); + t = null; - // Set states to initial values - initializated = false; - finishing = false; + // Reset metrics + failsReported = 0; + fails = 0; + resourcesCreated = 0; + resourcesProvided = 0; + resourcesReturned = 0; - System.out.println("Pool [" + getName() + "] successfully destroyed."); + // Set states to initial values + initializated = false; + finishing = false; + + println("Pool [" + getName() + "] successfully destroyed."); } /** @@ -343,203 +365,199 @@ public abstract class FixedResourcePool { */ @SuppressWarnings("unchecked") public void init() { - if (initializated == true) { - System.err.println("Warning, double initialization of [" + this - + "]"); - return; - } + if (initializated == true) { + println("Warning, double initialization of [" + this + + "]"); + return; + } - initializated = true; + initializated = true; - // Create queues with maximum possible capacity - availableQueue = new LinkedBlockingQueue>(resourcesNumber); - repairQueue = new LinkedBlockingQueue>(resourcesNumber); + // Create queues with maximum possible capacity + availableQueue = new LinkedBlockingQueue>(resourcesNumber); + repairQueue = new LinkedBlockingQueue>(resourcesNumber); - // Create and start the repair threads. - repairThreads = new FixedResourcePool.RepairThread[repairThreadsNumber]; - for (int i = 0; i < repairThreads.length; i++) { - repairThreads[i] = new RepairThread(); - repairThreads[i].setName("REPAIR[" + i + "]:" + getName()); - repairThreads[i].start(); - } + // Create and start the repair threads. + repairThreads = new FixedResourcePool.RepairThread[repairThreadsNumber]; + for (int i = 0; i < repairThreads.length; i++) { + repairThreads[i] = new RepairThread(); + repairThreads[i].setName("REPAIR[" + i + "]:" + getName()); + repairThreads[i].start(); + } - // Create resource wrappers with null content. - for (int i = 0; i < resourcesNumber; i++) { - if (!repairQueue.offer(new Wrapper(null))) - throw new IllegalStateException( - "What!? not enough space in the repairQueue to offer the element. This shouldn't happen!"); - } + // Create resource wrappers with null content. + for (int i = 0; i < resourcesNumber; i++) { + if (!repairQueue.offer(new Wrapper(null))) + throw new IllegalStateException( + "What!? not enough space in the repairQueue to offer the element. This shouldn't happen!"); + } - // Schedule a status report every 10 seconds. - t = new Timer(); - t.schedule(new TimerTask() { - @Override - public void run() { - System.out.println("**********************************"); - System.out.println("* Pool name:[" + name + "]"); - System.out.println("* resourcesCreated....:" - + getResourcesCreated()); - System.out.println("* failsReported.......:" - + getFailsReported()); - System.out.println("* fails...............:" + getFails()); - System.out.println("* resourcesCreated....:" - + getResourcesCreated()); - System.out.println("* resourcesProvided...:" - + getResourcesProvided()); - System.out.println("* resourcesReturned...:" - + getResourcesReturned()); - System.out.println("* available size......:" - + availableQueue.size()); - System.out.println("* repair size.........:" - + repairQueue.size()); - System.out.println("**********************************"); - } - }, 10000, 10000); + // Schedule a status report every 10 seconds. + t = new Timer(); + t.schedule(new TimerTask() { + @Override + public void run() { + printStatistics(); + } + }, 10000, 10000); - System.out.println("Initialized [" + name + "]"); + println("Initialized [" + name + "]"); + } + + private void printStatistics() { + println("**********************************" + + "\n* Pool name:[" + name + "]" + + "\n* resourcesCreated....:" + getResourcesCreated() + + "\n* failsReported.......:" + getFailsReported() + + "\n* fails...............:" + getFails() + + "\n* resourcesCreated....:" + getResourcesCreated() + + "\n* resourcesProvided...:" + getResourcesProvided() + + "\n* resourcesReturned...:" + getResourcesReturned() + + "\n* available size......:" + availableQueue.size() + + "\n* repair size.........:" + repairQueue.size() + + "\n**********************************"); } protected void checkInit() { - if (!initializated) - throw new IllegalStateException("Call the init() method first!"); + if (!initializated) + throw new IllegalStateException("Call the init() method first!"); } /** * Returns true if wrapped resource needs validation - * + * * @param wrapper * @return */ private boolean isValidationNeeded(Wrapper wrapper) { - // Add noise to the check times to avoid simultaneous resource checking. - long noisyTimeBetweenCheck = (timeBetweenValidation - (long) ((Math - .random() - 0.5) * (timeBetweenValidation / 10))); + // Add noise to the check times to avoid simultaneous resource checking. + long noisyTimeBetweenCheck = (timeBetweenValidation - (long) ((Math + .random() - 0.5) * (timeBetweenValidation / 10))); - // Check if the resource need to be checked. - return wrapper.getLastMark() + noisyTimeBetweenCheck < System - .currentTimeMillis(); + // Check if the resource need to be checked. + return wrapper.getLastMark() + noisyTimeBetweenCheck < System + .currentTimeMillis(); } /** * Return a resource to the pool. When no longer needed. - * + * * @param resource */ public void returnResource(T resource) { - checkInit(); + checkInit(); - Wrapper wrapper; + Wrapper wrapper; - if (resource == null) - throw new IllegalArgumentException( - "The resource shouldn't be null."); + if (resource == null) + throw new IllegalArgumentException( + "The resource shouldn't be null."); - // Delete the resource from the inUse list. - synchronized (inUse) { - wrapper = inUse.remove(resource); - } + // Delete the resource from the inUse list. + synchronized (inUse) { + wrapper = inUse.remove(resource); + } - if (wrapper == null) - throw new IllegalArgumentException("The resource [" + resource - + "] isn't in the busy resources list."); + if (wrapper == null) + throw new IllegalArgumentException("The resource [" + resource + + "] isn't in the busy resources list."); - if (isValidationNeeded(wrapper)) { - if (!repairQueue.offer(wrapper)) - throw new IllegalStateException( - "This shouldn't happen. Offering to repair queue rejected."); - } else { - if (!availableQueue.offer(wrapper)) - throw new IllegalStateException( - "This shouldn't happen. Offering to available queue rejected."); - } - resourcesReturned++; + if (isValidationNeeded(wrapper)) { + if (!repairQueue.offer(wrapper)) + throw new IllegalStateException( + "This shouldn't happen. Offering to repair queue rejected."); + } else { + if (!availableQueue.offer(wrapper)) + throw new IllegalStateException( + "This shouldn't happen. Offering to available queue rejected."); + } + resourcesReturned++; - if (finishing) { - synchronized (this) { - this.notify(); - } - } + if (finishing) { + synchronized (this) { + this.notify(); + } + } } /** * Return a broken resource to the pool. If the application detects a * malfunction of the resource. This resources will go directly to the * repair queue. - * + * * @param resource */ public void returnBrokenResource(T resource) { - checkInit(); - Wrapper wrapper; + checkInit(); + Wrapper wrapper; - // Delete the resource from the inUse list. - synchronized (inUse) { - wrapper = inUse.remove(resource); - } + // Delete the resource from the inUse list. + synchronized (inUse) { + wrapper = inUse.remove(resource); + } - if (wrapper == null) - throw new IllegalArgumentException("The resource [" + resource - + "] isn't in the busy resources list."); + if (wrapper == null) + throw new IllegalArgumentException("The resource [" + resource + + "] isn't in the busy resources list."); - if (!repairQueue.offer(wrapper)) - throw new IllegalStateException( - "This shouldn't happen. Offering to repair queue rejected."); - resourcesReturned++; + if (!repairQueue.offer(wrapper)) + throw new IllegalStateException( + "This shouldn't happen. Offering to repair queue rejected."); + resourcesReturned++; - if (finishing) { - synchronized (this) { - this.notify(); - } - } + if (finishing) { + synchronized (this) { + this.notify(); + } + } } /** * Get a resource from the pool waiting the default time. * {@link #setDefaultPoolWait(long)} - * + * * @return the resource of type T * @throws TimeoutException */ public T getResource() throws TimeoutException { - return getResource(defaultPoolWait); + return getResource(defaultPoolWait); } /** * Get a resource from the pool. - * - * @param maxTime - * Max time you would like to wait for the resource + * + * @param maxTime Max time you would like to wait for the resource * @return * @throws TimeoutException */ public T getResource(long maxTime) throws TimeoutException { - if (finishing) - throw new IllegalStateException("Pool [" + getName() - + "] is currently being destroyed."); - checkInit(); + if (finishing) + throw new IllegalStateException("Pool [" + getName() + + "] is currently being destroyed."); + checkInit(); - final long tInit = System.currentTimeMillis(); - do { - try { - long timeSpent = System.currentTimeMillis() - tInit; - long timeToSleep = maxTime - timeSpent; - timeToSleep = timeToSleep > 0 ? timeToSleep : 0; - if (timeToSleep == 0) - throw new TimeoutException("" + timeSpent + ">" + maxTime); - Wrapper ret = availableQueue.poll(timeToSleep, - TimeUnit.MILLISECONDS); - if (ret != null) { - synchronized (inUse) { - inUse.put(ret.wrapped, ret); - } - resourcesProvided++; - return ret.wrapped; - } - } catch (InterruptedException e1) { - e1.printStackTrace(); - } // If the wait gets interrupted, doesn't matter but print it (just - // in case). - } while (true); + final long tInit = System.currentTimeMillis(); + do { + try { + long timeSpent = System.currentTimeMillis() - tInit; + long timeToSleep = maxTime - timeSpent; + timeToSleep = timeToSleep > 0 ? timeToSleep : 0; + if (timeToSleep == 0) + throw new TimeoutException("" + timeSpent + ">" + maxTime); + Wrapper ret = availableQueue.poll(timeToSleep, + TimeUnit.MILLISECONDS); + if (ret != null) { + synchronized (inUse) { + inUse.put(ret.wrapped, ret); + } + resourcesProvided++; + return ret.wrapped; + } + } catch (InterruptedException e1) { + e1.printStackTrace(); + } // If the wait gets interrupted, doesn't matter but print it (just + // in case). + } while (true); } /* @@ -553,7 +571,7 @@ public abstract class FixedResourcePool { /** * Check if the resource is still valid. - * + * * @param resource * @return */ @@ -561,14 +579,14 @@ public abstract class FixedResourcePool { /** * Destroy a resource. - * + * * @param resource */ protected abstract void destroyResource(T resource); @Override public String toString() { - return getName() + "[" + super.toString() + "]"; + return getName() + "[" + super.toString() + "]"; } /** diff --git a/src/test/java/redis/clients/jedis/tests/benchmark/PoolBenchmark.java b/src/test/java/redis/clients/jedis/tests/benchmark/PoolBenchmark.java index cc665b2..c87d51a 100644 --- a/src/test/java/redis/clients/jedis/tests/benchmark/PoolBenchmark.java +++ b/src/test/java/redis/clients/jedis/tests/benchmark/PoolBenchmark.java @@ -5,6 +5,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; @@ -24,7 +25,7 @@ public class PoolBenchmark { // withoutPool(); withPool(); long elapsed = System.currentTimeMillis() - t; - System.out.println(((1000 * 3 * TOTAL_OPERATIONS) / elapsed) + " ops"); + System.out.println(((1000 * 2 * TOTAL_OPERATIONS) / elapsed) + " ops"); } private static void withoutPool() throws InterruptedException { @@ -60,30 +61,35 @@ public class PoolBenchmark { private static void withPool() throws InterruptedException { final JedisPool pool = new JedisPool("localhost"); - pool.setResourcesNumber(1000); - pool.setDefaultPoolWait(20); + pool.setResourcesNumber(50); + pool.setDefaultPoolWait(1000000); pool.init(); List tds = new ArrayList(); - for (int i = 0; i < TOTAL_OPERATIONS; i++) { - final String key = "foo" + i; + final AtomicInteger ind = new AtomicInteger(); + for (int i = 0; i < 50; i++) { Thread hj = new Thread(new Runnable() { - @Override public void run() { - try { - Jedis j = pool.getResource(); - j.auth("foobared"); - j.set(key, key); - j.get(key); - pool.returnResource(j); - } catch (Exception e) { - e.printStackTrace(); - } + for(int i = 0; (i = ind.getAndIncrement()) < TOTAL_OPERATIONS; ) { + try { + Jedis j = pool.getResource(); + final String key = "foo" + i; + j.set(key, key); + j.get(key); + pool.returnResource(j); + } catch (Exception e) { + e.printStackTrace(); + } + } } }); tds.add(hj); hj.start(); } + + for(Thread t : tds) + t.join(); + pool.destroy(); } } \ No newline at end of file