From e6105efd69d097b5901b67c7e0889af1f0908977 Mon Sep 17 00:00:00 2001 From: Jonathan Leibiusky Date: Tue, 7 Sep 2010 16:31:32 -0300 Subject: [PATCH] Removed not useful information from the pool --- .../redis/clients/util/FixedResourcePool.java | 1030 +++++++++-------- .../clients/jedis/tests/JedisPoolTest.java | 12 +- 2 files changed, 547 insertions(+), 495 deletions(-) diff --git a/src/main/java/redis/clients/util/FixedResourcePool.java b/src/main/java/redis/clients/util/FixedResourcePool.java index 3114afe..6a9ee8f 100644 --- a/src/main/java/redis/clients/util/FixedResourcePool.java +++ b/src/main/java/redis/clients/util/FixedResourcePool.java @@ -10,524 +10,570 @@ import java.util.concurrent.TimeoutException; /** * Abstract resource pool of type T. * - * Needs implementation for creation, validation and destruction of the resources. + * Needs implementation for creation, validation and destruction of the + * resources. * * Keeps a fixed amount of resources * * @author Luis Dario Simonassi - * - * @param The type of the resource to be managed. + * + * @param + * The type of the resource to be managed. */ -public abstract class FixedResourcePool { +public abstract class FixedResourcePool { - /* - * Generic Inner Control Classes - * ------- ----- ------- ------- - * * Wrapper - * * RepairThread - */ - - /** - * Generic Resource Wrapper - */ - private static class Wrapper{ - long timestamp; - T wrapped; + /* + * Generic Inner Control Classes ------- ----- ------- ------- * Wrapper * + * RepairThread + */ - public Wrapper(T wrapped){ - this.wrapped=wrapped; - mark(); - } - - public void mark(){ - timestamp= System.currentTimeMillis(); - } - - public long getLastMark(){ - return timestamp; - } - } - - /** - * Generic Repair Thread - */ - protected class RepairThread extends Thread{ - public void run(){ - - // Contribute to the repairing and validation effort until the pool is destroyed (finishig=true) - while(!finishing){ - Wrapper wrapper; - try { - // Remove the oldest element from the repair queue. - wrapper = repairQueue.poll(timeBetweenValidation, TimeUnit.MILLISECONDS); - if(wrapper==null){ - // If I've been waiting too much, i'll check the idle pool if connections need - // validation and move them to the repair queue - checkIdles(); - continue; - } - } catch (InterruptedException e) { - e.printStackTrace(); - continue; - } - - // Now, I have something to repair! - T resource= wrapper.wrapped; - boolean valid= false; - - // Resources are null right after initialization, it means the same as being an invalid resource - if(resource!=null){ - valid= isResourceValid(resource); // Validate the resource. - if(!valid) fails ++; - } + /** + * Generic Resource Wrapper + */ + private static class Wrapper { + long timestamp; + T wrapped; - // If resource is invalid or null, create a new resource and destroy the invalid one. - if(!valid){ - T replace= createResource(); - resourcesCreated++; - wrapper.wrapped= replace; - if(resource!=null) - destroyResource(resource); - } - - // Mark the resource as fresh! - wrapper.mark(); - - // Offer the resource to the available resources pool. - if(!availableQueue.offer(wrapper)){ - System.err.println("This shouldn't happen, offering to available was rejected."); - } - } - - System.out.println("Ending thread ["+Thread.currentThread().getName()+"]"); - } - - /** - * Check if resources in the idle queue needs to be repaired - */ - private void checkIdles() { - // Get a sample without removing it - Wrapper wrapper= availableQueue.peek(); - - // If no available items, nothing to repair. - if(wrapper==null) - return; - - // Check if the sampled resource needs to be repaired - boolean repairNeeded= isValidationNeeded(wrapper); - if(!repairNeeded) - return; - - // Move available resources from the available queue to the repair queue until no repair is needed. - while(repairNeeded){ - - // Get the connection from the available queue and check again. - wrapper= availableQueue.poll(); - - // No resources in the available queue, nothing to do - if(wrapper==null){ - repairNeeded= false; - return ; - } - - // Add the resource to the corresponding queue, depending on weather the resource needs to be repaired or not. - repairNeeded= isValidationNeeded(wrapper); - - if(repairNeeded) { - if(!repairQueue.offer(wrapper)){ - System.err.print("FATAL: This shouldn't happen, offering to repairing was rejected."); - } - }else{ - if(!availableQueue.offer(wrapper)){ - System.err.print("FATAL: This shouldn't happen, offering to available was rejected."); - } - } - } - } + public Wrapper(T wrapped) { + this.wrapped = wrapped; + mark(); } - /* - * Pool metrics - */ - private volatile long failsReported = 0; - private volatile long fails = 0; - private volatile long resourcesCreated = 0; - private volatile long resourcesProvided= 0; - private volatile long resourcesReturned= 0; - - - /* - * Pool metrics accessing methods. - */ - - public long getFailsReported() { - return failsReported; + public void mark() { + timestamp = System.currentTimeMillis(); } - public long getFails() { - return fails; + public long getLastMark() { + return timestamp; } + } - public long getResourcesCreated() { - return resourcesCreated; - } + /** + * Generic Repair Thread + */ + protected class RepairThread extends Thread { + public void run() { - public long getResourcesProvided() { - return resourcesProvided; - } - - public long getResourcesReturned() { - return resourcesReturned; - } - - /* - * Pool status structures - */ - private LinkedBlockingQueue> availableQueue; - private LinkedBlockingQueue> repairQueue; - private HashMap> inUse= new HashMap>(); - private RepairThread[] repairThreads; - private Timer t; - private boolean initializated = false; - private boolean finishing = false; - - - /* - * Pool configuration parameters - */ - private String name; - private long defaultPoolWait=50; - private int resourcesNumber = 10; - private int repairThreadsNumber = 3; - private long timeBetweenValidation = 150000; - - /* - * Bean pool configuration - */ - - public int getResourcesNumber() { - return resourcesNumber; - } - - public void setResourcesNumber(int resourcesNumber) { - this.resourcesNumber = resourcesNumber; - } - - public int getRepairThreadsNumber() { - return repairThreadsNumber; - } - - public void setRepairThreadsNumber(int repairThreadsNumber) { - if(initializated) throw new IllegalStateException("Repair threads should be setted up before init()"); - this.repairThreadsNumber = repairThreadsNumber; - } - - public long getTimeBetweenValidation() { - return timeBetweenValidation; - } - - public void setTimeBetweenValidation(long timeBetweenValidation) { - this.timeBetweenValidation = timeBetweenValidation; - } - - public void setName(String name) { - if(initializated) throw new IllegalStateException("Name should be setted up before init()"); - this.name = name; - } - - public String getName() { - return name; - } - - public void setDefaultPoolWait(long defaultPoolWait) { - this.defaultPoolWait = defaultPoolWait; - } - - public long getDefaultPoolWait() { - return defaultPoolWait; - } - - - - /** - * Pool initialization & destruction - */ - public void destroy() { - checkInit(); - - System.out.println("Destroying ["+getName()+"]..."); - - // Signal al threads to end - finishing=true; - - System.out.println("Destroying ["+getName()+"] threads"); - // Wait for the Repair Threas - for(int i=0; i < repairThreads.length; i++){ - boolean joined= false; - do { - try { - repairThreads[i].interrupt(); - repairThreads[i].join(); - joined= true; - } catch (InterruptedException e) { - e.printStackTrace(); - } - } while(!joined); - } - - System.out.println("Waiting for ["+getName()+"] resources to be returned."); - // Wait for all resources to be returned to the pool - synchronized (this) { - while(!inUse.isEmpty()){ - try { - this.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - System.out.println("Destroying ["+getName()+"] resources."); - // Destroy resources - for (Wrapper resource : availableQueue) { - destroyResource(resource.wrapped); - } - - availableQueue.clear(); - availableQueue= null; - - for (Wrapper resource : repairQueue) { - destroyResource(resource.wrapped); - } - - repairQueue.clear(); - repairQueue= null; - - // Destroy metrics timer - System.out.println("Shuting metrics timer for ["+getName()+"] down."); - t.cancel(); - t=null; - - // Reset metrics - failsReported= 0; - fails= 0; - resourcesCreated= 0; - resourcesProvided= 0; - resourcesReturned= 0; - - // Set states to initial values - initializated= false; - finishing=false; - - System.out.println("Pool ["+getName()+"] successfully destroyed."); - } - - /** - * Initialize the pool - */ - @SuppressWarnings("unchecked") - public void init(){ - if(initializated==true){ - System.err.println("Warning, double initialization of ["+this+"]"); - return; - } - - initializated=true; - - // Create queues with maximum possible capacity - availableQueue= new LinkedBlockingQueue>(resourcesNumber); - repairQueue= new LinkedBlockingQueue>(resourcesNumber); - - // Create and start the repair threads. - repairThreads= new FixedResourcePool.RepairThread[repairThreadsNumber]; - for(int i=0; i < repairThreads.length; i++){ - repairThreads[i]= new RepairThread(); - repairThreads[i].setName("REPAIR["+i+"]:"+name); - repairThreads[i].start(); - } - - // Create resource wrappers with null content. - for(int i=0; i < resourcesNumber; i++){ - if(!repairQueue.offer(new Wrapper(null))) - throw new IllegalStateException("What!? not enough space in the repairQueue to offer the element. This shouldn't happen!"); - } - - // Schedule a status report every 10 seconds. - t= new Timer(); - t.schedule(new TimerTask() { - @Override - public void run() { - System.out.println("**********************************"); - System.out.println("* Pool name:["+name+"]"); - System.out.println("* resourcesCreated....:"+getResourcesCreated()); - System.out.println("* failsReported.......:"+getFailsReported()); - System.out.println("* fails...............:"+getFails()); - System.out.println("* resourcesCreated....:"+getResourcesCreated()); - System.out.println("* resourcesProvided...:"+getResourcesProvided()); - System.out.println("* resourcesReturned...:"+getResourcesReturned()); - System.out.println("* available size......:"+availableQueue.size()); - System.out.println("* repair size.........:"+repairQueue.size()); - System.out.println("**********************************"); - } - }, 10000, 10000); - - System.out.println("Initialized ["+name+"]"); - } - - - protected void checkInit(){ - if(!initializated) throw new IllegalStateException("Call the init() method first!"); - } - - /** - * Returns true if wrapped resource needs validation - * @param wrapper - * @return - */ - private boolean isValidationNeeded(Wrapper wrapper){ - //Add noise to the check times to avoid simultaneous resource checking. - long noisyTimeBetweenCheck= (timeBetweenValidation - (long)((Math.random()-0.5)*(timeBetweenValidation/10))); - - //Check if the resource need to be checked. - return wrapper.getLastMark()+noisyTimeBetweenCheck < System.currentTimeMillis(); - } - - - /** - * Return a resource to the pool. When no longer needed. - * @param resource - */ - public void returnResource(T resource){ - checkInit(); - + // Contribute to the repairing and validation effort until the pool + // is destroyed (finishig=true) + while (!finishing) { Wrapper wrapper; - - if(resource==null) throw new IllegalArgumentException("The resource shouldn't be null."); - - //Delete the resource from the inUse list. - synchronized (inUse) { - wrapper= inUse.remove(resource); + try { + // Remove the oldest element from the repair queue. + wrapper = repairQueue.poll(timeBetweenValidation, + TimeUnit.MILLISECONDS); + if (wrapper == null) { + // If I've been waiting too much, i'll check the idle + // pool if connections need + // validation and move them to the repair queue + checkIdles(); + continue; + } + } catch (InterruptedException e) { + continue; } - - if(wrapper==null) throw new IllegalArgumentException("The resource ["+resource+"] isn't in the busy resources list."); - if(isValidationNeeded(wrapper)){ - if(!repairQueue.offer(wrapper)) throw new IllegalStateException("This shouldn't happen. Offering to repair queue rejected."); - }else{ - if(!availableQueue.offer(wrapper)) throw new IllegalStateException("This shouldn't happen. Offering to available queue rejected."); + // Now, I have something to repair! + T resource = wrapper.wrapped; + boolean valid = false; + + // Resources are null right after initialization, it means the + // same as being an invalid resource + if (resource != null) { + valid = isResourceValid(resource); // Validate the resource. + if (!valid) + fails++; } - resourcesReturned++; - - if(finishing){ - synchronized (this) { - this.notify(); - } + + // If resource is invalid or null, create a new resource and + // destroy the invalid one. + if (!valid) { + T replace = createResource(); + resourcesCreated++; + wrapper.wrapped = replace; + if (resource != null) + destroyResource(resource); } - } - - - /** - * Return a broken resource to the pool. If the application detects a malfunction of the resource. - * This resources will go directly to the repair queue. - * @param resource - */ - public void returnBrokenResource(T resource){ - checkInit(); - Wrapper wrapper; - - //Delete the resource from the inUse list. - synchronized (inUse) { - wrapper= inUse.remove(resource); + // Mark the resource as fresh! + wrapper.mark(); + + // Offer the resource to the available resources pool. + if (!availableQueue.offer(wrapper)) { + System.err + .println("This shouldn't happen, offering to available was rejected."); } - - - if(wrapper==null) throw new IllegalArgumentException("The resource ["+resource+"] isn't in the busy resources list."); - - if(!repairQueue.offer(wrapper)) throw new IllegalStateException("This shouldn't happen. Offering to repair queue rejected."); - resourcesReturned++; + } - if(finishing){ - synchronized (this) { - this.notify(); - } - } - } - - - /** - * Get a resource from the pool waiting the default time. - * {@link #setDefaultPoolWait(long)} - * @return the resource of type T - * @throws TimeoutException - */ - public T getResource() throws TimeoutException{ - return getResource(defaultPoolWait); - } - - /** - * Get a resource from the pool. - * @param maxTime Max time you would like to wait for the resource - * @return - * @throws TimeoutException - */ - public T getResource(long maxTime) throws TimeoutException{ - if(finishing) - throw new IllegalStateException("Pool ["+getName()+"] is currently being destroyed."); - checkInit(); - - final long tInit= System.currentTimeMillis(); - do{ - try { - long timeSpent= System.currentTimeMillis()-tInit; - long timeToSleep= maxTime-timeSpent; - timeToSleep= timeToSleep>0?timeToSleep:0; - if(timeToSleep == 0) throw new TimeoutException(""+timeSpent+">"+maxTime); - Wrapper ret= availableQueue.poll(timeToSleep, TimeUnit.MILLISECONDS); - if(ret!=null){ - synchronized (inUse) { - inUse.put(ret.wrapped, ret); - } - resourcesProvided++; - return ret.wrapped; - } - } catch (InterruptedException e1) { e1.printStackTrace(); } //If the wait gets interrupted, doesn't matter but print it (just in case). - } while(true); - } - - - /* - * Implementation dependent methods. To be implemented. - */ - - - /** - * Create a resource for the pool - */ - protected abstract T createResource(); - - /** - * Check if the resource is still valid. - * @param resource - * @return - */ - protected abstract boolean isResourceValid(T resource); - - /** - * Destroy a resource. - * @param resource - */ - protected abstract void destroyResource(T resource); - - - @Override - public String toString() { - return getName()+"["+super.toString()+"]"; + System.out.println("Ending thread [" + + Thread.currentThread().getName() + "]"); } /** - * Coming features: - * TODO Busy time check. Cron to check when a resource is being taken for a long time. + * Check if resources in the idle queue needs to be repaired */ + private void checkIdles() { + // Get a sample without removing it + Wrapper wrapper = availableQueue.peek(); + + // If no available items, nothing to repair. + if (wrapper == null) + return; + + // Check if the sampled resource needs to be repaired + boolean repairNeeded = isValidationNeeded(wrapper); + if (!repairNeeded) + return; + + // Move available resources from the available queue to the repair + // queue until no repair is needed. + while (repairNeeded) { + + // Get the connection from the available queue and check again. + wrapper = availableQueue.poll(); + + // No resources in the available queue, nothing to do + if (wrapper == null) { + repairNeeded = false; + return; + } + + // Add the resource to the corresponding queue, depending on + // weather the resource needs to be repaired or not. + repairNeeded = isValidationNeeded(wrapper); + + if (repairNeeded) { + if (!repairQueue.offer(wrapper)) { + System.err + .print("FATAL: This shouldn't happen, offering to repairing was rejected."); + } + } else { + if (!availableQueue.offer(wrapper)) { + System.err + .print("FATAL: This shouldn't happen, offering to available was rejected."); + } + } + } + } + } + + /* + * Pool metrics + */ + private volatile long failsReported = 0; + private volatile long fails = 0; + private volatile long resourcesCreated = 0; + private volatile long resourcesProvided = 0; + private volatile long resourcesReturned = 0; + + /* + * Pool metrics accessing methods. + */ + + public long getFailsReported() { + return failsReported; + } + + public long getFails() { + return fails; + } + + public long getResourcesCreated() { + return resourcesCreated; + } + + public long getResourcesProvided() { + return resourcesProvided; + } + + public long getResourcesReturned() { + return resourcesReturned; + } + + /* + * Pool status structures + */ + private LinkedBlockingQueue> availableQueue; + private LinkedBlockingQueue> repairQueue; + private HashMap> inUse = new HashMap>(); + private RepairThread[] repairThreads; + private Timer t; + private boolean initializated = false; + private boolean finishing = false; + + /* + * Pool configuration parameters + */ + private String name; + private long defaultPoolWait = 50; + private int resourcesNumber = 10; + private int repairThreadsNumber = 3; + private long timeBetweenValidation = 150000; + + /* + * Bean pool configuration + */ + + public int getResourcesNumber() { + return resourcesNumber; + } + + public void setResourcesNumber(int resourcesNumber) { + this.resourcesNumber = resourcesNumber; + } + + public int getRepairThreadsNumber() { + return repairThreadsNumber; + } + + public void setRepairThreadsNumber(int repairThreadsNumber) { + if (initializated) + throw new IllegalStateException( + "Repair threads should be setted up before init()"); + this.repairThreadsNumber = repairThreadsNumber; + } + + public long getTimeBetweenValidation() { + return timeBetweenValidation; + } + + public void setTimeBetweenValidation(long timeBetweenValidation) { + this.timeBetweenValidation = timeBetweenValidation; + } + + public void setName(String name) { + if (initializated) + throw new IllegalStateException( + "Name should be setted up before init()"); + this.name = name; + } + + public String getName() { + if (name == null || name.isEmpty()) { + name = this.getClass().getName(); + } + return name; + } + + public void setDefaultPoolWait(long defaultPoolWait) { + this.defaultPoolWait = defaultPoolWait; + } + + public long getDefaultPoolWait() { + return defaultPoolWait; + } + + /** + * Pool initialization & destruction + */ + public void destroy() { + checkInit(); + + System.out.println("Destroying [" + getName() + "]..."); + + // Signal al threads to end + finishing = true; + + System.out.println("Destroying [" + getName() + "] threads"); + // Wait for the Repair Threas + for (int i = 0; i < repairThreads.length; i++) { + boolean joined = false; + do { + try { + repairThreads[i].interrupt(); + repairThreads[i].join(); + joined = true; + } catch (InterruptedException e) { + e.printStackTrace(); + } + } while (!joined); + } + + System.out.println("Waiting for [" + getName() + + "] resources to be returned."); + // Wait for all resources to be returned to the pool + synchronized (this) { + while (!inUse.isEmpty()) { + try { + this.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + System.out.println("Destroying [" + getName() + "] resources."); + // Destroy resources + for (Wrapper resource : availableQueue) { + destroyResource(resource.wrapped); + } + + availableQueue.clear(); + availableQueue = null; + + for (Wrapper resource : repairQueue) { + destroyResource(resource.wrapped); + } + + repairQueue.clear(); + repairQueue = null; + + // Destroy metrics timer + System.out.println("Shuting metrics timer for [" + getName() + + "] down."); + t.cancel(); + t = null; + + // Reset metrics + failsReported = 0; + fails = 0; + resourcesCreated = 0; + resourcesProvided = 0; + resourcesReturned = 0; + + // Set states to initial values + initializated = false; + finishing = false; + + System.out.println("Pool [" + getName() + "] successfully destroyed."); + } + + /** + * Initialize the pool + */ + @SuppressWarnings("unchecked") + public void init() { + if (initializated == true) { + System.err.println("Warning, double initialization of [" + this + + "]"); + return; + } + + initializated = true; + + // Create queues with maximum possible capacity + availableQueue = new LinkedBlockingQueue>(resourcesNumber); + repairQueue = new LinkedBlockingQueue>(resourcesNumber); + + // Create and start the repair threads. + repairThreads = new FixedResourcePool.RepairThread[repairThreadsNumber]; + for (int i = 0; i < repairThreads.length; i++) { + repairThreads[i] = new RepairThread(); + repairThreads[i].setName("REPAIR[" + i + "]:" + getName()); + repairThreads[i].start(); + } + + // Create resource wrappers with null content. + for (int i = 0; i < resourcesNumber; i++) { + if (!repairQueue.offer(new Wrapper(null))) + throw new IllegalStateException( + "What!? not enough space in the repairQueue to offer the element. This shouldn't happen!"); + } + + // Schedule a status report every 10 seconds. + t = new Timer(); + t.schedule(new TimerTask() { + @Override + public void run() { + System.out.println("**********************************"); + System.out.println("* Pool name:[" + name + "]"); + System.out.println("* resourcesCreated....:" + + getResourcesCreated()); + System.out.println("* failsReported.......:" + + getFailsReported()); + System.out.println("* fails...............:" + getFails()); + System.out.println("* resourcesCreated....:" + + getResourcesCreated()); + System.out.println("* resourcesProvided...:" + + getResourcesProvided()); + System.out.println("* resourcesReturned...:" + + getResourcesReturned()); + System.out.println("* available size......:" + + availableQueue.size()); + System.out.println("* repair size.........:" + + repairQueue.size()); + System.out.println("**********************************"); + } + }, 10000, 10000); + + System.out.println("Initialized [" + name + "]"); + } + + protected void checkInit() { + if (!initializated) + throw new IllegalStateException("Call the init() method first!"); + } + + /** + * Returns true if wrapped resource needs validation + * + * @param wrapper + * @return + */ + private boolean isValidationNeeded(Wrapper wrapper) { + // Add noise to the check times to avoid simultaneous resource checking. + long noisyTimeBetweenCheck = (timeBetweenValidation - (long) ((Math + .random() - 0.5) * (timeBetweenValidation / 10))); + + // Check if the resource need to be checked. + return wrapper.getLastMark() + noisyTimeBetweenCheck < System + .currentTimeMillis(); + } + + /** + * Return a resource to the pool. When no longer needed. + * + * @param resource + */ + public void returnResource(T resource) { + checkInit(); + + Wrapper wrapper; + + if (resource == null) + throw new IllegalArgumentException( + "The resource shouldn't be null."); + + // Delete the resource from the inUse list. + synchronized (inUse) { + wrapper = inUse.remove(resource); + } + + if (wrapper == null) + throw new IllegalArgumentException("The resource [" + resource + + "] isn't in the busy resources list."); + + if (isValidationNeeded(wrapper)) { + if (!repairQueue.offer(wrapper)) + throw new IllegalStateException( + "This shouldn't happen. Offering to repair queue rejected."); + } else { + if (!availableQueue.offer(wrapper)) + throw new IllegalStateException( + "This shouldn't happen. Offering to available queue rejected."); + } + resourcesReturned++; + + if (finishing) { + synchronized (this) { + this.notify(); + } + } + } + + /** + * Return a broken resource to the pool. If the application detects a + * malfunction of the resource. This resources will go directly to the + * repair queue. + * + * @param resource + */ + public void returnBrokenResource(T resource) { + checkInit(); + Wrapper wrapper; + + // Delete the resource from the inUse list. + synchronized (inUse) { + wrapper = inUse.remove(resource); + } + + if (wrapper == null) + throw new IllegalArgumentException("The resource [" + resource + + "] isn't in the busy resources list."); + + if (!repairQueue.offer(wrapper)) + throw new IllegalStateException( + "This shouldn't happen. Offering to repair queue rejected."); + resourcesReturned++; + + if (finishing) { + synchronized (this) { + this.notify(); + } + } + } + + /** + * Get a resource from the pool waiting the default time. + * {@link #setDefaultPoolWait(long)} + * + * @return the resource of type T + * @throws TimeoutException + */ + public T getResource() throws TimeoutException { + return getResource(defaultPoolWait); + } + + /** + * Get a resource from the pool. + * + * @param maxTime + * Max time you would like to wait for the resource + * @return + * @throws TimeoutException + */ + public T getResource(long maxTime) throws TimeoutException { + if (finishing) + throw new IllegalStateException("Pool [" + getName() + + "] is currently being destroyed."); + checkInit(); + + final long tInit = System.currentTimeMillis(); + do { + try { + long timeSpent = System.currentTimeMillis() - tInit; + long timeToSleep = maxTime - timeSpent; + timeToSleep = timeToSleep > 0 ? timeToSleep : 0; + if (timeToSleep == 0) + throw new TimeoutException("" + timeSpent + ">" + maxTime); + Wrapper ret = availableQueue.poll(timeToSleep, + TimeUnit.MILLISECONDS); + if (ret != null) { + synchronized (inUse) { + inUse.put(ret.wrapped, ret); + } + resourcesProvided++; + return ret.wrapped; + } + } catch (InterruptedException e1) { + e1.printStackTrace(); + } // If the wait gets interrupted, doesn't matter but print it (just + // in case). + } while (true); + } + + /* + * Implementation dependent methods. To be implemented. + */ + + /** + * Create a resource for the pool + */ + protected abstract T createResource(); + + /** + * Check if the resource is still valid. + * + * @param resource + * @return + */ + protected abstract boolean isResourceValid(T resource); + + /** + * Destroy a resource. + * + * @param resource + */ + protected abstract void destroyResource(T resource); + + @Override + public String toString() { + return getName() + "[" + super.toString() + "]"; + } + + /** + * Coming features: TODO Busy time check. Cron to check when a resource is + * being taken for a long time. + */ } diff --git a/src/test/java/redis/clients/jedis/tests/JedisPoolTest.java b/src/test/java/redis/clients/jedis/tests/JedisPoolTest.java index 1f5bd17..4444ee6 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisPoolTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisPoolTest.java @@ -22,6 +22,7 @@ public class JedisPoolTest extends Assert { jedis.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); pool.returnResource(jedis); + pool.destroy(); } @Test @@ -35,6 +36,7 @@ public class JedisPoolTest extends Assert { jedis.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); pool.returnResource(jedis); + pool.destroy(); } @Test @@ -51,6 +53,8 @@ public class JedisPoolTest extends Assert { jedis = pool.getResource(200); jedis.auth("foobared"); jedis.incr("foo"); + pool.returnResource(jedis); + pool.destroy(); } @Test @@ -68,6 +72,8 @@ public class JedisPoolTest extends Assert { jedis = pool.getResource(200); jedis.auth("foobared"); jedis.incr("foo"); + pool.returnResource(jedis); + pool.destroy(); } @Test(expected = TimeoutException.class) @@ -80,8 +86,8 @@ public class JedisPoolTest extends Assert { jedis.auth("foobared"); jedis.set("foo", "0"); - jedis = pool.getResource(200); - jedis.auth("foobared"); - jedis.incr("foo"); + Jedis newJedis = pool.getResource(200); + newJedis.auth("foobared"); + newJedis.incr("foo"); } } \ No newline at end of file