Removed not useful information from the pool
This commit is contained in:
@@ -10,21 +10,21 @@ import java.util.concurrent.TimeoutException;
|
||||
/**
|
||||
* Abstract resource pool of type T.
|
||||
*
|
||||
* Needs implementation for creation, validation and destruction of the resources.
|
||||
* Needs implementation for creation, validation and destruction of the
|
||||
* resources.
|
||||
*
|
||||
* Keeps a fixed amount of resources
|
||||
*
|
||||
* @author Luis Dario Simonassi
|
||||
*
|
||||
* @param <T> The type of the resource to be managed.
|
||||
* @param <T>
|
||||
* The type of the resource to be managed.
|
||||
*/
|
||||
public abstract class FixedResourcePool<T> {
|
||||
|
||||
/*
|
||||
* Generic Inner Control Classes
|
||||
* ------- ----- ------- -------
|
||||
* * Wrapper
|
||||
* * RepairThread
|
||||
* Generic Inner Control Classes ------- ----- ------- ------- * Wrapper *
|
||||
* RepairThread
|
||||
*/
|
||||
|
||||
/**
|
||||
@@ -54,20 +54,22 @@ public abstract class FixedResourcePool <T> {
|
||||
protected class RepairThread extends Thread {
|
||||
public void run() {
|
||||
|
||||
// Contribute to the repairing and validation effort until the pool is destroyed (finishig=true)
|
||||
// Contribute to the repairing and validation effort until the pool
|
||||
// is destroyed (finishig=true)
|
||||
while (!finishing) {
|
||||
Wrapper<T> wrapper;
|
||||
try {
|
||||
// Remove the oldest element from the repair queue.
|
||||
wrapper = repairQueue.poll(timeBetweenValidation, TimeUnit.MILLISECONDS);
|
||||
wrapper = repairQueue.poll(timeBetweenValidation,
|
||||
TimeUnit.MILLISECONDS);
|
||||
if (wrapper == null) {
|
||||
// If I've been waiting too much, i'll check the idle pool if connections need
|
||||
// If I've been waiting too much, i'll check the idle
|
||||
// pool if connections need
|
||||
// validation and move them to the repair queue
|
||||
checkIdles();
|
||||
continue;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -75,13 +77,16 @@ public abstract class FixedResourcePool <T> {
|
||||
T resource = wrapper.wrapped;
|
||||
boolean valid = false;
|
||||
|
||||
// Resources are null right after initialization, it means the same as being an invalid resource
|
||||
// Resources are null right after initialization, it means the
|
||||
// same as being an invalid resource
|
||||
if (resource != null) {
|
||||
valid = isResourceValid(resource); // Validate the resource.
|
||||
if(!valid) fails ++;
|
||||
if (!valid)
|
||||
fails++;
|
||||
}
|
||||
|
||||
// If resource is invalid or null, create a new resource and destroy the invalid one.
|
||||
// If resource is invalid or null, create a new resource and
|
||||
// destroy the invalid one.
|
||||
if (!valid) {
|
||||
T replace = createResource();
|
||||
resourcesCreated++;
|
||||
@@ -95,11 +100,13 @@ public abstract class FixedResourcePool <T> {
|
||||
|
||||
// Offer the resource to the available resources pool.
|
||||
if (!availableQueue.offer(wrapper)) {
|
||||
System.err.println("This shouldn't happen, offering to available was rejected.");
|
||||
System.err
|
||||
.println("This shouldn't happen, offering to available was rejected.");
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("Ending thread ["+Thread.currentThread().getName()+"]");
|
||||
System.out.println("Ending thread ["
|
||||
+ Thread.currentThread().getName() + "]");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -118,7 +125,8 @@ public abstract class FixedResourcePool <T> {
|
||||
if (!repairNeeded)
|
||||
return;
|
||||
|
||||
// Move available resources from the available queue to the repair queue until no repair is needed.
|
||||
// Move available resources from the available queue to the repair
|
||||
// queue until no repair is needed.
|
||||
while (repairNeeded) {
|
||||
|
||||
// Get the connection from the available queue and check again.
|
||||
@@ -130,16 +138,19 @@ public abstract class FixedResourcePool <T> {
|
||||
return;
|
||||
}
|
||||
|
||||
// Add the resource to the corresponding queue, depending on weather the resource needs to be repaired or not.
|
||||
// Add the resource to the corresponding queue, depending on
|
||||
// weather the resource needs to be repaired or not.
|
||||
repairNeeded = isValidationNeeded(wrapper);
|
||||
|
||||
if (repairNeeded) {
|
||||
if (!repairQueue.offer(wrapper)) {
|
||||
System.err.print("FATAL: This shouldn't happen, offering to repairing was rejected.");
|
||||
System.err
|
||||
.print("FATAL: This shouldn't happen, offering to repairing was rejected.");
|
||||
}
|
||||
} else {
|
||||
if (!availableQueue.offer(wrapper)) {
|
||||
System.err.print("FATAL: This shouldn't happen, offering to available was rejected.");
|
||||
System.err
|
||||
.print("FATAL: This shouldn't happen, offering to available was rejected.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -155,7 +166,6 @@ public abstract class FixedResourcePool <T> {
|
||||
private volatile long resourcesProvided = 0;
|
||||
private volatile long resourcesReturned = 0;
|
||||
|
||||
|
||||
/*
|
||||
* Pool metrics accessing methods.
|
||||
*/
|
||||
@@ -191,7 +201,6 @@ public abstract class FixedResourcePool <T> {
|
||||
private boolean initializated = false;
|
||||
private boolean finishing = false;
|
||||
|
||||
|
||||
/*
|
||||
* Pool configuration parameters
|
||||
*/
|
||||
@@ -218,7 +227,9 @@ public abstract class FixedResourcePool <T> {
|
||||
}
|
||||
|
||||
public void setRepairThreadsNumber(int repairThreadsNumber) {
|
||||
if(initializated) throw new IllegalStateException("Repair threads should be setted up before init()");
|
||||
if (initializated)
|
||||
throw new IllegalStateException(
|
||||
"Repair threads should be setted up before init()");
|
||||
this.repairThreadsNumber = repairThreadsNumber;
|
||||
}
|
||||
|
||||
@@ -231,11 +242,16 @@ public abstract class FixedResourcePool <T> {
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
if(initializated) throw new IllegalStateException("Name should be setted up before init()");
|
||||
if (initializated)
|
||||
throw new IllegalStateException(
|
||||
"Name should be setted up before init()");
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
if (name == null || name.isEmpty()) {
|
||||
name = this.getClass().getName();
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
@@ -247,8 +263,6 @@ public abstract class FixedResourcePool <T> {
|
||||
return defaultPoolWait;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Pool initialization & destruction
|
||||
*/
|
||||
@@ -275,7 +289,8 @@ public abstract class FixedResourcePool <T> {
|
||||
} while (!joined);
|
||||
}
|
||||
|
||||
System.out.println("Waiting for ["+getName()+"] resources to be returned.");
|
||||
System.out.println("Waiting for [" + getName()
|
||||
+ "] resources to be returned.");
|
||||
// Wait for all resources to be returned to the pool
|
||||
synchronized (this) {
|
||||
while (!inUse.isEmpty()) {
|
||||
@@ -304,7 +319,8 @@ public abstract class FixedResourcePool <T> {
|
||||
repairQueue = null;
|
||||
|
||||
// Destroy metrics timer
|
||||
System.out.println("Shuting metrics timer for ["+getName()+"] down.");
|
||||
System.out.println("Shuting metrics timer for [" + getName()
|
||||
+ "] down.");
|
||||
t.cancel();
|
||||
t = null;
|
||||
|
||||
@@ -328,7 +344,8 @@ public abstract class FixedResourcePool <T> {
|
||||
@SuppressWarnings("unchecked")
|
||||
public void init() {
|
||||
if (initializated == true) {
|
||||
System.err.println("Warning, double initialization of ["+this+"]");
|
||||
System.err.println("Warning, double initialization of [" + this
|
||||
+ "]");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -342,14 +359,15 @@ public abstract class FixedResourcePool <T> {
|
||||
repairThreads = new FixedResourcePool.RepairThread[repairThreadsNumber];
|
||||
for (int i = 0; i < repairThreads.length; i++) {
|
||||
repairThreads[i] = new RepairThread();
|
||||
repairThreads[i].setName("REPAIR["+i+"]:"+name);
|
||||
repairThreads[i].setName("REPAIR[" + i + "]:" + getName());
|
||||
repairThreads[i].start();
|
||||
}
|
||||
|
||||
// Create resource wrappers with null content.
|
||||
for (int i = 0; i < resourcesNumber; i++) {
|
||||
if (!repairQueue.offer(new Wrapper<T>(null)))
|
||||
throw new IllegalStateException("What!? not enough space in the repairQueue to offer the element. This shouldn't happen!");
|
||||
throw new IllegalStateException(
|
||||
"What!? not enough space in the repairQueue to offer the element. This shouldn't happen!");
|
||||
}
|
||||
|
||||
// Schedule a status report every 10 seconds.
|
||||
@@ -359,14 +377,21 @@ public abstract class FixedResourcePool <T> {
|
||||
public void run() {
|
||||
System.out.println("**********************************");
|
||||
System.out.println("* Pool name:[" + name + "]");
|
||||
System.out.println("* resourcesCreated....:"+getResourcesCreated());
|
||||
System.out.println("* failsReported.......:"+getFailsReported());
|
||||
System.out.println("* resourcesCreated....:"
|
||||
+ getResourcesCreated());
|
||||
System.out.println("* failsReported.......:"
|
||||
+ getFailsReported());
|
||||
System.out.println("* fails...............:" + getFails());
|
||||
System.out.println("* resourcesCreated....:"+getResourcesCreated());
|
||||
System.out.println("* resourcesProvided...:"+getResourcesProvided());
|
||||
System.out.println("* resourcesReturned...:"+getResourcesReturned());
|
||||
System.out.println("* available size......:"+availableQueue.size());
|
||||
System.out.println("* repair size.........:"+repairQueue.size());
|
||||
System.out.println("* resourcesCreated....:"
|
||||
+ getResourcesCreated());
|
||||
System.out.println("* resourcesProvided...:"
|
||||
+ getResourcesProvided());
|
||||
System.out.println("* resourcesReturned...:"
|
||||
+ getResourcesReturned());
|
||||
System.out.println("* available size......:"
|
||||
+ availableQueue.size());
|
||||
System.out.println("* repair size.........:"
|
||||
+ repairQueue.size());
|
||||
System.out.println("**********************************");
|
||||
}
|
||||
}, 10000, 10000);
|
||||
@@ -374,27 +399,30 @@ public abstract class FixedResourcePool <T> {
|
||||
System.out.println("Initialized [" + name + "]");
|
||||
}
|
||||
|
||||
|
||||
protected void checkInit() {
|
||||
if(!initializated) throw new IllegalStateException("Call the init() method first!");
|
||||
if (!initializated)
|
||||
throw new IllegalStateException("Call the init() method first!");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if wrapped resource needs validation
|
||||
*
|
||||
* @param wrapper
|
||||
* @return
|
||||
*/
|
||||
private boolean isValidationNeeded(Wrapper<T> wrapper) {
|
||||
// Add noise to the check times to avoid simultaneous resource checking.
|
||||
long noisyTimeBetweenCheck= (timeBetweenValidation - (long)((Math.random()-0.5)*(timeBetweenValidation/10)));
|
||||
long noisyTimeBetweenCheck = (timeBetweenValidation - (long) ((Math
|
||||
.random() - 0.5) * (timeBetweenValidation / 10)));
|
||||
|
||||
// Check if the resource need to be checked.
|
||||
return wrapper.getLastMark()+noisyTimeBetweenCheck < System.currentTimeMillis();
|
||||
return wrapper.getLastMark() + noisyTimeBetweenCheck < System
|
||||
.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return a resource to the pool. When no longer needed.
|
||||
*
|
||||
* @param resource
|
||||
*/
|
||||
public void returnResource(T resource) {
|
||||
@@ -402,19 +430,27 @@ public abstract class FixedResourcePool <T> {
|
||||
|
||||
Wrapper<T> wrapper;
|
||||
|
||||
if(resource==null) throw new IllegalArgumentException("The resource shouldn't be null.");
|
||||
if (resource == null)
|
||||
throw new IllegalArgumentException(
|
||||
"The resource shouldn't be null.");
|
||||
|
||||
// Delete the resource from the inUse list.
|
||||
synchronized (inUse) {
|
||||
wrapper = inUse.remove(resource);
|
||||
}
|
||||
|
||||
if(wrapper==null) throw new IllegalArgumentException("The resource ["+resource+"] isn't in the busy resources list.");
|
||||
if (wrapper == null)
|
||||
throw new IllegalArgumentException("The resource [" + resource
|
||||
+ "] isn't in the busy resources list.");
|
||||
|
||||
if (isValidationNeeded(wrapper)) {
|
||||
if(!repairQueue.offer(wrapper)) throw new IllegalStateException("This shouldn't happen. Offering to repair queue rejected.");
|
||||
if (!repairQueue.offer(wrapper))
|
||||
throw new IllegalStateException(
|
||||
"This shouldn't happen. Offering to repair queue rejected.");
|
||||
} else {
|
||||
if(!availableQueue.offer(wrapper)) throw new IllegalStateException("This shouldn't happen. Offering to available queue rejected.");
|
||||
if (!availableQueue.offer(wrapper))
|
||||
throw new IllegalStateException(
|
||||
"This shouldn't happen. Offering to available queue rejected.");
|
||||
}
|
||||
resourcesReturned++;
|
||||
|
||||
@@ -425,11 +461,11 @@ public abstract class FixedResourcePool <T> {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Return a broken resource to the pool. If the application detects a malfunction of the resource.
|
||||
* This resources will go directly to the repair queue.
|
||||
* Return a broken resource to the pool. If the application detects a
|
||||
* malfunction of the resource. This resources will go directly to the
|
||||
* repair queue.
|
||||
*
|
||||
* @param resource
|
||||
*/
|
||||
public void returnBrokenResource(T resource) {
|
||||
@@ -441,10 +477,13 @@ public abstract class FixedResourcePool <T> {
|
||||
wrapper = inUse.remove(resource);
|
||||
}
|
||||
|
||||
if (wrapper == null)
|
||||
throw new IllegalArgumentException("The resource [" + resource
|
||||
+ "] isn't in the busy resources list.");
|
||||
|
||||
if(wrapper==null) throw new IllegalArgumentException("The resource ["+resource+"] isn't in the busy resources list.");
|
||||
|
||||
if(!repairQueue.offer(wrapper)) throw new IllegalStateException("This shouldn't happen. Offering to repair queue rejected.");
|
||||
if (!repairQueue.offer(wrapper))
|
||||
throw new IllegalStateException(
|
||||
"This shouldn't happen. Offering to repair queue rejected.");
|
||||
resourcesReturned++;
|
||||
|
||||
if (finishing) {
|
||||
@@ -454,10 +493,10 @@ public abstract class FixedResourcePool <T> {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get a resource from the pool waiting the default time.
|
||||
* {@link #setDefaultPoolWait(long)}
|
||||
*
|
||||
* @return the resource of type T
|
||||
* @throws TimeoutException
|
||||
*/
|
||||
@@ -467,13 +506,16 @@ public abstract class FixedResourcePool <T> {
|
||||
|
||||
/**
|
||||
* Get a resource from the pool.
|
||||
* @param maxTime Max time you would like to wait for the resource
|
||||
*
|
||||
* @param maxTime
|
||||
* Max time you would like to wait for the resource
|
||||
* @return
|
||||
* @throws TimeoutException
|
||||
*/
|
||||
public T getResource(long maxTime) throws TimeoutException {
|
||||
if (finishing)
|
||||
throw new IllegalStateException("Pool ["+getName()+"] is currently being destroyed.");
|
||||
throw new IllegalStateException("Pool [" + getName()
|
||||
+ "] is currently being destroyed.");
|
||||
checkInit();
|
||||
|
||||
final long tInit = System.currentTimeMillis();
|
||||
@@ -482,8 +524,10 @@ public abstract class FixedResourcePool <T> {
|
||||
long timeSpent = System.currentTimeMillis() - tInit;
|
||||
long timeToSleep = maxTime - timeSpent;
|
||||
timeToSleep = timeToSleep > 0 ? timeToSleep : 0;
|
||||
if(timeToSleep == 0) throw new TimeoutException(""+timeSpent+">"+maxTime);
|
||||
Wrapper<T> ret= availableQueue.poll(timeToSleep, TimeUnit.MILLISECONDS);
|
||||
if (timeToSleep == 0)
|
||||
throw new TimeoutException("" + timeSpent + ">" + maxTime);
|
||||
Wrapper<T> ret = availableQueue.poll(timeToSleep,
|
||||
TimeUnit.MILLISECONDS);
|
||||
if (ret != null) {
|
||||
synchronized (inUse) {
|
||||
inUse.put(ret.wrapped, ret);
|
||||
@@ -491,16 +535,17 @@ public abstract class FixedResourcePool <T> {
|
||||
resourcesProvided++;
|
||||
return ret.wrapped;
|
||||
}
|
||||
} catch (InterruptedException e1) { e1.printStackTrace(); } //If the wait gets interrupted, doesn't matter but print it (just in case).
|
||||
} catch (InterruptedException e1) {
|
||||
e1.printStackTrace();
|
||||
} // If the wait gets interrupted, doesn't matter but print it (just
|
||||
// in case).
|
||||
} while (true);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Implementation dependent methods. To be implemented.
|
||||
*/
|
||||
|
||||
|
||||
/**
|
||||
* Create a resource for the pool
|
||||
*/
|
||||
@@ -508,6 +553,7 @@ public abstract class FixedResourcePool <T> {
|
||||
|
||||
/**
|
||||
* Check if the resource is still valid.
|
||||
*
|
||||
* @param resource
|
||||
* @return
|
||||
*/
|
||||
@@ -515,19 +561,19 @@ public abstract class FixedResourcePool <T> {
|
||||
|
||||
/**
|
||||
* Destroy a resource.
|
||||
*
|
||||
* @param resource
|
||||
*/
|
||||
protected abstract void destroyResource(T resource);
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getName() + "[" + super.toString() + "]";
|
||||
}
|
||||
|
||||
/**
|
||||
* Coming features:
|
||||
* TODO Busy time check. Cron to check when a resource is being taken for a long time.
|
||||
* Coming features: TODO Busy time check. Cron to check when a resource is
|
||||
* being taken for a long time.
|
||||
*/
|
||||
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ public class JedisPoolTest extends Assert {
|
||||
jedis.set("foo", "bar");
|
||||
assertEquals("bar", jedis.get("foo"));
|
||||
pool.returnResource(jedis);
|
||||
pool.destroy();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -35,6 +36,7 @@ public class JedisPoolTest extends Assert {
|
||||
jedis.set("foo", "bar");
|
||||
assertEquals("bar", jedis.get("foo"));
|
||||
pool.returnResource(jedis);
|
||||
pool.destroy();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -51,6 +53,8 @@ public class JedisPoolTest extends Assert {
|
||||
jedis = pool.getResource(200);
|
||||
jedis.auth("foobared");
|
||||
jedis.incr("foo");
|
||||
pool.returnResource(jedis);
|
||||
pool.destroy();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -68,6 +72,8 @@ public class JedisPoolTest extends Assert {
|
||||
jedis = pool.getResource(200);
|
||||
jedis.auth("foobared");
|
||||
jedis.incr("foo");
|
||||
pool.returnResource(jedis);
|
||||
pool.destroy();
|
||||
}
|
||||
|
||||
@Test(expected = TimeoutException.class)
|
||||
@@ -80,8 +86,8 @@ public class JedisPoolTest extends Assert {
|
||||
jedis.auth("foobared");
|
||||
jedis.set("foo", "0");
|
||||
|
||||
jedis = pool.getResource(200);
|
||||
jedis.auth("foobared");
|
||||
jedis.incr("foo");
|
||||
Jedis newJedis = pool.getResource(200);
|
||||
newJedis.auth("foobared");
|
||||
newJedis.incr("foo");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user