diff --git a/pom.xml b/pom.xml
index 20e1333..77170b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,13 @@
jar
test
+
+ commons-pool
+ commons-pool
+ 1.5.5
+ jar
+ compile
+
diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java
index 84f9579..486ddcf 100644
--- a/src/main/java/redis/clients/jedis/Connection.java
+++ b/src/main/java/redis/clients/jedis/Connection.java
@@ -19,7 +19,7 @@ public class Connection {
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private int pipelinedCommands = 0;
- private int timeout = 2000;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
public int getTimeout() {
return timeout;
diff --git a/src/main/java/redis/clients/jedis/JedisPool.java b/src/main/java/redis/clients/jedis/JedisPool.java
index 052410d..d0d4c20 100644
--- a/src/main/java/redis/clients/jedis/JedisPool.java
+++ b/src/main/java/redis/clients/jedis/JedisPool.java
@@ -1,82 +1,111 @@
package redis.clients.jedis;
-import redis.clients.util.FixedResourcePool;
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
-public class JedisPool extends FixedResourcePool {
- private String host;
- private int port;
- private int timeout;
- private String password;
+public class JedisPool {
+ private final GenericObjectPool internalPool;
- public JedisPool(String host) {
- this.host = host;
- this.port = Protocol.DEFAULT_PORT;
- }
+ public JedisPool(final GenericObjectPool.Config poolConfig, final String host) {
+ this(poolConfig, host, Protocol.DEFAULT_PORT, Protocol.DEFAULT_TIMEOUT, null);
+ }
- public JedisPool(String host, int port) {
- this.host = host;
- this.port = port;
- }
+ public JedisPool(final GenericObjectPool.Config poolConfig,final String host, final int port) {
+ this(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null);
+ }
- public JedisPool(String host, int port, int timeout) {
- this.host = host;
- this.port = port;
- this.timeout = timeout;
- }
+ public JedisPool(final GenericObjectPool.Config poolConfig,final String host, final int port, final int timeout) {
+ this(poolConfig, host, port, timeout, null);
+ }
- public JedisPool(String host, int port, int timeout, String password) {
- this.host = host;
- this.port = port;
- this.timeout = timeout;
- this.password = password;
- }
+ public JedisPool(final GenericObjectPool.Config poolConfig,final String host, final int port, final int timeout, final String password) {
+ final String lhost;
+ final int lport;
+ final int ltimeout;
+ final String lpassword;
+ lhost = host;
+ lport = port;
+ ltimeout = (timeout > 0) ? timeout : Protocol.DEFAULT_TIMEOUT;
+ lpassword = password;
+
+ final JedisFactory factory = new JedisFactory(lhost, lport, ltimeout, lpassword);
+ this.internalPool = new GenericObjectPool(factory, poolConfig);
+ }
- public JedisPool(JedisShardInfo shardInfo) {
- this.host = shardInfo.getHost();
- this.port = shardInfo.getPort();
- this.timeout = shardInfo.getTimeout();
- this.password = shardInfo.getPassword();
- }
+ public JedisPool(final GenericObjectPool.Config poolConfig, final JedisShardInfo shardInfo) {
+ this(poolConfig, shardInfo.getHost(), shardInfo.getPort(), shardInfo.getTimeout(), shardInfo.getPassword());
+ }
+
+ public Jedis getResource() throws Exception {
+ return (Jedis) internalPool.borrowObject();
+ }
+
+ public void returnResource(final Jedis jedis) throws Exception {
+ internalPool.returnObject(jedis);
+ }
- @Override
- protected Jedis createResource() {
- Jedis jedis = new Jedis(this.host, this.port, this.timeout);
- boolean done = false;
- while (!done) {
- try {
- jedis.connect();
- if (password != null) {
- jedis.auth(password);
+ /**
+ * PoolableObjectFactory custom impl.
+ */
+ private static class JedisFactory extends BasePoolableObjectFactory {
+ private final String host;
+ private final int port;
+ private final int timeout;
+ private final String password;
+
+ public JedisFactory(final String host, final int port, final int timeout, final String password) {
+ super();
+ this.host = host;
+ this.port = port;
+ this.timeout = (timeout > 0) ? timeout : -1;
+ this.password = password;
}
- done = true;
- } catch (Exception e) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e1) {
+
+ @Override
+ public Object makeObject() throws Exception {
+ final Jedis jedis;
+ if (timeout > 0) {
+ jedis = new Jedis(this.host, this.port, this.timeout);
+ } else {
+ jedis = new Jedis(this.host, this.port);
+ }
+
+ jedis.connect();
+ if (null != this.password) {
+ jedis.auth(this.password);
+ }
+ return jedis;
}
- }
- }
- return jedis;
- }
- @Override
- protected void destroyResource(Jedis jedis) {
- if (jedis != null && jedis.isConnected()) {
- try {
- jedis.quit();
- jedis.disconnect();
- } catch (Exception e) {
+ @Override
+ public void destroyObject(final Object obj) throws Exception {
+ if(obj instanceof Jedis) {
+ final Jedis jedis = (Jedis) obj;
+ if (jedis.isConnected()) {
+ try {
+ jedis.quit();
+ jedis.disconnect();
+ } catch (Exception e) {
+
+ }
+ }
+ }
+ }
- }
- }
- }
+ @Override
+ public boolean validateObject(final Object obj) {
+ if(obj instanceof Jedis) {
+ final Jedis jedis = (Jedis) obj;
+ try {
+ return jedis.isConnected() && jedis.ping().equals("PONG");
+ } catch (final Exception e) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
- @Override
- protected boolean isResourceValid(Jedis jedis) {
- try {
- return jedis.isConnected() && jedis.ping().equals("PONG");
- } catch (Exception ex) {
- return false;
+
}
- }
}
diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java
index 00b9cae..7c55fc2 100644
--- a/src/main/java/redis/clients/jedis/Protocol.java
+++ b/src/main/java/redis/clients/jedis/Protocol.java
@@ -11,6 +11,7 @@ import redis.clients.util.RedisOutputStream;
public final class Protocol {
public static final int DEFAULT_PORT = 6379;
+ public static final int DEFAULT_TIMEOUT = 2000;
public static final Charset UTF8 = Charset.forName("UTF-8");
diff --git a/src/main/java/redis/clients/util/FixedResourcePool.java b/src/main/java/redis/clients/util/FixedResourcePool.java
deleted file mode 100644
index 7ea9961..0000000
--- a/src/main/java/redis/clients/util/FixedResourcePool.java
+++ /dev/null
@@ -1,647 +0,0 @@
-package redis.clients.util;
-
-import java.util.HashMap;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Abstract resource pool of type T.
- *
- * Needs implementation for creation, validation and destruction of the
- * resources.
- *
- * Keeps a fixed amount of resources
- *
- * @author Luis Dario Simonassi
- *
- * @param
- * The type of the resource to be managed.
- */
-public abstract class FixedResourcePool {
- private static Logger logger = Logger.getLogger(FixedResourcePool.class
- .getName());
-
- /*
- * Generic Inner Control Classes ------- ----- ------- ------- * Wrapper *
- * RepairThread
- */
-
- /**
- * Generic Resource Wrapper
- */
- private static class Wrapper {
- long timestamp;
- T wrapped;
-
- public Wrapper(T wrapped) {
- this.wrapped = wrapped;
- mark();
- }
-
- public void mark() {
- timestamp = System.currentTimeMillis();
- }
-
- public long getLastMark() {
- return timestamp;
- }
- }
-
- /**
- * Generic Repair Thread
- */
- protected class RepairThread extends Thread {
- public void run() {
-
- // Contribute to the repairing and validation effort until the pool
- // is destroyed (finishig=true)
- while (!finishing) {
- Wrapper wrapper;
- try {
- // Remove the oldest element from the repair queue.
- wrapper = repairQueue.poll(timeBetweenValidation,
- TimeUnit.MILLISECONDS);
- if (wrapper == null) {
- // If I've been waiting too much, i'll check the idle
- // pool if connections need
- // validation and move them to the repair queue
- checkIdles();
- continue;
- }
- } catch (InterruptedException e) {
- if (logger.isLoggable(Level.SEVERE)) {
- logger.severe(e.getMessage());
- }
- continue;
- }
-
- // Now, I have something to repair!
- T resource = wrapper.wrapped;
- boolean valid = false;
-
- // Resources are null right after initialization, it means the
- // same as being an invalid resource
- if (resource != null) {
- valid = isResourceValid(resource); // Validate the resource.
- if (!valid)
- metrics.fails++;
- }
-
- // If resource is invalid or null, create a new resource and
- // destroy the invalid one.
- if (!valid) {
- T replace = createResource();
- metrics.resourcesCreated++;
- wrapper.wrapped = replace;
- if (resource != null)
- destroyResource(resource);
- }
-
- // Mark the resource as fresh!
- wrapper.mark();
-
- // Offer the resource to the available resources pool.
- if (!availableQueue.offer(wrapper)) {
- if (logger.isLoggable(Level.SEVERE)) {
- logger
- .severe("This shouldn't happen, offering to available was rejected.");
- }
- }
- }
-
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Ending thread ["
- + Thread.currentThread().getName() + "]");
- }
- }
-
- /**
- * Check if resources in the idle queue needs to be repaired
- */
- private void checkIdles() {
- // Get a sample without removing it
- Wrapper wrapper = availableQueue.peek();
-
- // If no available items, nothing to repair.
- if (wrapper == null)
- return;
-
- // Check if the sampled resource needs to be repaired
- boolean repairNeeded = isValidationNeeded(wrapper);
- if (!repairNeeded)
- return;
-
- // Move available resources from the available queue to the repair
- // queue until no repair is needed.
- while (repairNeeded) {
-
- // Get the connection from the available queue and check again.
- wrapper = availableQueue.poll();
-
- // No resources in the available queue, nothing to do
- if (wrapper == null) {
- repairNeeded = false;
- return;
- }
-
- // Add the resource to the corresponding queue, depending on
- // weather the resource needs to be repaired or not.
- repairNeeded = isValidationNeeded(wrapper);
-
- if (repairNeeded) {
- if (!repairQueue.offer(wrapper)) {
- if (logger.isLoggable(Level.SEVERE)) {
- logger
- .severe("FATAL: This shouldn't happen, offering to repairing was rejected.");
- }
- }
- } else {
- if (!availableQueue.offer(wrapper)) {
- if (logger.isLoggable(Level.SEVERE)) {
- logger
- .severe("FATAL: This shouldn't happen, offering to available was rejected.");
- }
- }
- }
- }
- }
- }
-
- /*
- * Pool metrics
- */
- Metrics metrics;
-
- public static class Metrics {
- private volatile long failsReported = 0;
- private volatile long fails = 0;
- private volatile long resourcesCreated = 0;
- private volatile long resourcesProvided = 0;
- private volatile long resourcesReturned = 0;
-
- /*
- * Pool metrics accessing methods.
- */
-
- public long getFailsReported() {
- return failsReported;
- }
-
- public long getFails() {
- return fails;
- }
-
- public long getResourcesCreated() {
- return resourcesCreated;
- }
-
- public long getResourcesProvided() {
- return resourcesProvided;
- }
-
- public long getResourcesReturned() {
- return resourcesReturned;
- }
- }
-
- /*
- * Pool status structures
- */
- private LinkedBlockingQueue> availableQueue;
- private LinkedBlockingQueue> repairQueue;
- private HashMap> inUse = new HashMap>();
- private RepairThread[] repairThreads;
- private Timer t;
- private boolean initializated = false;
- private boolean finishing = false;
-
- /*
- * Pool configuration parameters
- */
- private String name;
- private long defaultPoolWait = 50;
- private int resourcesNumber = 10;
- private int repairThreadsNumber = 3;
- private long timeBetweenValidation = 150000;
- private long destructionWait = 10000;
-
- /*
- * Bean pool configuration
- */
-
- public int getResourcesNumber() {
- return resourcesNumber;
- }
-
- public void setResourcesNumber(int resourcesNumber) {
- this.resourcesNumber = resourcesNumber;
- }
-
- public int getRepairThreadsNumber() {
- return repairThreadsNumber;
- }
-
- public void setRepairThreadsNumber(int repairThreadsNumber) {
- if (initializated)
- throw new IllegalStateException(
- "Repair threads should be setted up before init()");
- this.repairThreadsNumber = repairThreadsNumber;
- }
-
- public long getTimeBetweenValidation() {
- return timeBetweenValidation;
- }
-
- public void setTimeBetweenValidation(long timeBetweenValidation) {
- this.timeBetweenValidation = timeBetweenValidation;
- }
-
- public long getDestructionWait() {
- return destructionWait;
- }
-
- public void setDestructionWait(long destructionWait) {
- this.destructionWait = destructionWait;
- }
-
- public void setName(String name) {
- if (initializated)
- throw new IllegalStateException(
- "Name should be setted up before init()");
- this.name = name;
- }
-
- public String getName() {
- if (name == null)
- name = getClass().getName();
- return name;
- }
-
- public void setDefaultPoolWait(long defaultPoolWait) {
- this.defaultPoolWait = defaultPoolWait;
- }
-
- public long getDefaultPoolWait() {
- return defaultPoolWait;
- }
-
- /**
- * Pool initialization & destruction
- */
-
- /**
- * Initialize the pool
- */
- @SuppressWarnings("unchecked")
- public void init() {
- if (initializated == true) {
- if (logger.isLoggable(Level.WARNING)) {
- logger.severe("Warning, double initialization of [" + this
- + "]");
- }
- return;
- }
-
- initializated = true;
- metrics = new Metrics();
-
- // Create queues with maximum possible capacity
- availableQueue = new LinkedBlockingQueue>(resourcesNumber);
- repairQueue = new LinkedBlockingQueue>(resourcesNumber);
-
- // Create and start the repair threads.
- repairThreads = new FixedResourcePool.RepairThread[repairThreadsNumber];
- for (int i = 0; i < repairThreads.length; i++) {
- repairThreads[i] = new RepairThread();
- repairThreads[i].setName("REPAIR[" + i + "]:" + name);
- repairThreads[i].start();
- }
-
- // Create resource wrappers with null content.
- for (int i = 0; i < resourcesNumber; i++) {
- if (!repairQueue.offer(new Wrapper(null)))
- throw new IllegalStateException(
- "What!? not enough space in the repairQueue to offer the element. This shouldn't happen!");
- }
-
- // Schedule a status report every 10 seconds.
- t = new Timer();
- t.schedule(new TimerTask() {
- @Override
- public void run() {
- if (logger.isLoggable(Level.FINE)) {
- StringBuilder sb = new StringBuilder();
- sb.append("**********************************");
- sb.append("* Pool name:[" + name + "]");
- sb.append("* resourcesCreated....:"
- + metrics.getResourcesCreated());
- sb.append("* failsReported.......:"
- + metrics.getFailsReported());
- sb.append("* fails...............:" + metrics.getFails());
- sb.append("* resourcesCreated....:"
- + metrics.getResourcesCreated());
- sb.append("* resourcesProvided...:"
- + metrics.getResourcesProvided());
- sb.append("* resourcesReturned...:"
- + metrics.getResourcesReturned());
- sb
- .append("* available size......:"
- + availableQueue.size());
- sb.append("* repair size.........:" + repairQueue.size());
- sb.append("**********************************");
- logger.fine(sb.toString());
- }
-
- }
- }, 10000, 10000);
-
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Initialized [" + name + "]");
- }
- }
-
- /**
- * Destroy the pool. After being destroyed the pool can be initialized
- * again.
- */
- public void destroy() {
- checkInit();
-
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Destroying [" + getName() + "]...");
- }
-
- // Signal al threads to end
- finishing = true;
-
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Destroying [" + getName() + "] threads");
- }
- // Wait for the Repair Threas
- for (int i = 0; i < repairThreads.length; i++) {
- boolean joined = false;
- do {
- try {
- repairThreads[i].interrupt();
- repairThreads[i].join();
- joined = true;
- } catch (InterruptedException e) {
- if (logger.isLoggable(Level.SEVERE)) {
- logger.severe(e.getMessage());
- }
- }
- } while (!joined);
- }
-
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Waiting at most [" + getDestructionWait()
- + "]ms for [" + getName() + "] resources to be returned.");
- }
- // Wait for all resources to be returned to the pool
- long t1 = System.currentTimeMillis();
- synchronized (this) {
- while (!inUse.isEmpty()) {
- try {
- this.wait(getDestructionWait());
- long t2 = System.currentTimeMillis();
- if ((t2 - t1 > getDestructionWait()) & !inUse.isEmpty()) {
- if (logger.isLoggable(Level.INFO)) {
- logger
- .info("Resource wait timeout. Forcing inUse resources destruction.");
- }
- for (T used : inUse.keySet()) {
- destroyResource(used);
- }
- }
- } catch (InterruptedException e) {
- if (logger.isLoggable(Level.SEVERE)) {
- logger.severe(e.getMessage());
- }
- }
- }
- }
-
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Destroying [" + getName() + "] resources.");
- }
- // Destroy resources
- for (Wrapper resource : availableQueue) {
- destroyResource(resource.wrapped);
- }
-
- availableQueue.clear();
- availableQueue = null;
-
- for (Wrapper resource : repairQueue) {
- destroyResource(resource.wrapped);
- }
-
- repairQueue.clear();
- repairQueue = null;
-
- // Destroy metrics timer
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Shuting metrics timer for [" + getName() + "] down.");
- }
- t.cancel();
- t = null;
-
- // Reset metrics
- metrics = null;
-
- // Set states to initial values
- initializated = false;
- finishing = false;
-
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Pool [" + getName() + "] successfully destroyed.");
- }
- }
-
- protected void checkInit() {
- if (!initializated)
- throw new IllegalStateException("Call the init() method first!");
- }
-
- /**
- * Returns true if wrapped resource needs validation
- *
- * @param wrapper
- * @return
- */
- private boolean isValidationNeeded(Wrapper wrapper) {
- // Add noise to the check times to avoid simultaneous resource checking.
- long noisyTimeBetweenCheck = (timeBetweenValidation - (long) ((Math
- .random() - 0.5) * (timeBetweenValidation / 10)));
-
- // Check if the resource need to be checked.
- return wrapper.getLastMark() + noisyTimeBetweenCheck < System
- .currentTimeMillis();
- }
-
- /**
- * Return a resource to the pool. When no longer needed.
- *
- * @param resource
- */
- public void returnResource(T resource) {
- checkInit();
-
- Wrapper wrapper;
-
- if (resource == null)
- throw new IllegalArgumentException(
- "The resource shouldn't be null.");
-
- // Delete the resource from the inUse list.
- synchronized (inUse) {
- wrapper = inUse.remove(resource);
- }
-
- if (wrapper == null)
- throw new IllegalArgumentException("The resource [" + resource
- + "] isn't in the busy resources list.");
-
- if (isValidationNeeded(wrapper)) {
- if (!repairQueue.offer(wrapper))
- throw new IllegalStateException(
- "This shouldn't happen. Offering to repair queue rejected.");
- } else {
- if (!availableQueue.offer(wrapper))
- throw new IllegalStateException(
- "This shouldn't happen. Offering to available queue rejected.");
- }
- metrics.resourcesReturned++;
-
- if (finishing) {
- synchronized (this) {
- this.notify();
- }
- }
- }
-
- /**
- * Return a broken resource to the pool. If the application detects a
- * malfunction of the resource. This resources will go directly to the
- * repair queue.
- *
- * @param resource
- */
- public void returnBrokenResource(T resource) {
- checkInit();
- Wrapper wrapper;
-
- // Delete the resource from the inUse list.
- synchronized (inUse) {
- wrapper = inUse.remove(resource);
- }
-
- if (wrapper == null)
- throw new IllegalArgumentException("The resource [" + resource
- + "] isn't in the busy resources list.");
-
- if (!repairQueue.offer(wrapper))
- throw new IllegalStateException(
- "This shouldn't happen. Offering to repair queue rejected.");
- metrics.resourcesReturned++;
-
- if (finishing) {
- synchronized (this) {
- this.notify();
- }
- }
- }
-
- /**
- * Get a resource from the pool waiting the default time.
- * {@link #setDefaultPoolWait(long)}
- *
- * @return the resource of type T
- * @throws TimeoutException
- */
- public T getResource() throws TimeoutException {
- return getResource(defaultPoolWait);
- }
-
- /**
- * Get a resource from the pool.
- *
- * @param maxTime
- * Max time you would like to wait for the resource
- * @return
- * @throws TimeoutException
- */
- public T getResource(long maxTime) throws TimeoutException {
- if (finishing)
- throw new IllegalStateException("Pool [" + getName()
- + "] is currently being destroyed.");
- checkInit();
-
- final long tInit = System.currentTimeMillis();
- do {
- try {
- long timeSpent = System.currentTimeMillis() - tInit;
- long timeToSleep = maxTime - timeSpent;
- timeToSleep = timeToSleep > 0 ? timeToSleep : 0;
- if (timeToSleep == 0)
- throw new TimeoutException("" + timeSpent + ">" + maxTime);
- Wrapper ret = availableQueue.poll(timeToSleep,
- TimeUnit.MILLISECONDS);
- if (ret != null) {
- synchronized (inUse) {
- inUse.put(ret.wrapped, ret);
- }
- metrics.resourcesProvided++;
- return ret.wrapped;
- }
- } catch (InterruptedException e1) {
- if (logger.isLoggable(Level.SEVERE)) {
- logger.severe(e1.getMessage());
- }
- } // If the wait gets interrupted, doesn't matter but print it (just
- // in case).
- } while (true);
- }
-
- /*
- * Implementation dependent methods. To be implemented.
- */
-
- /**
- * Create a resource for the pool. This method is always called from the
- * REPAIR threads. If it's impossible to create the resource, this method
- * should wait or hold until resource creation becomes possible.
- */
- protected abstract T createResource();
-
- /**
- * Check if the resource is still valid.
- *
- * @param resource
- * @return
- */
- protected abstract boolean isResourceValid(T resource);
-
- /**
- * Destroy a resource.
- *
- * @param resource
- */
- protected abstract void destroyResource(T resource);
-
- @Override
- public String toString() {
- return getName() + "[" + super.toString() + "]";
- }
-
- /**
- * Coming features: TODO Busy time check. Cron to check when a resource is
- * being taken for a long time.
- */
-
-}
\ No newline at end of file