replace custom pool implementation with apache's

This commit is contained in:
Jonathan Leibiusky
2010-11-21 18:16:31 -03:00
parent e7582644b1
commit 71eb4c5b4a
6 changed files with 236 additions and 238 deletions

View File

@@ -2,110 +2,95 @@ package redis.clients.jedis;
import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
public class JedisPool { import redis.clients.util.Pool;
private final GenericObjectPool internalPool;
public JedisPool(final GenericObjectPool.Config poolConfig, final String host) { public class JedisPool extends Pool<Jedis> {
this(poolConfig, host, Protocol.DEFAULT_PORT, Protocol.DEFAULT_TIMEOUT, null);
}
public JedisPool(final GenericObjectPool.Config poolConfig,final String host, final int port) { public JedisPool(final GenericObjectPool.Config poolConfig,
this(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null); final String host) {
} this(poolConfig, host, Protocol.DEFAULT_PORT, Protocol.DEFAULT_TIMEOUT,
null);
}
public JedisPool(final GenericObjectPool.Config poolConfig,final String host, final int port, final int timeout) { public JedisPool(final Config poolConfig, final String host, int port,
this(poolConfig, host, port, timeout, null); int timeout, final String password) {
} super(poolConfig, new JedisFactory(host, port, timeout, password));
}
public JedisPool(final GenericObjectPool.Config poolConfig,final String host, final int port, final int timeout, final String password) { public JedisPool(final GenericObjectPool.Config poolConfig,
final String lhost; final String host, final int port) {
final int lport; this(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null);
final int ltimeout; }
final String lpassword;
lhost = host;
lport = port;
ltimeout = (timeout > 0) ? timeout : Protocol.DEFAULT_TIMEOUT;
lpassword = password;
final JedisFactory factory = new JedisFactory(lhost, lport, ltimeout, lpassword);
this.internalPool = new GenericObjectPool(factory, poolConfig);
}
public JedisPool(final GenericObjectPool.Config poolConfig, final JedisShardInfo shardInfo) { public JedisPool(final GenericObjectPool.Config poolConfig,
this(poolConfig, shardInfo.getHost(), shardInfo.getPort(), shardInfo.getTimeout(), shardInfo.getPassword()); final String host, final int port, final int timeout) {
} this(poolConfig, host, port, timeout, null);
}
public Jedis getResource() throws Exception {
return (Jedis) internalPool.borrowObject();
}
public void returnResource(final Jedis jedis) throws Exception {
internalPool.returnObject(jedis);
}
/** /**
* PoolableObjectFactory custom impl. * PoolableObjectFactory custom impl.
*/ */
private static class JedisFactory extends BasePoolableObjectFactory { private static class JedisFactory extends BasePoolableObjectFactory {
private final String host; private final String host;
private final int port; private final int port;
private final int timeout; private final int timeout;
private final String password; private final String password;
public JedisFactory(final String host, final int port, final int timeout, final String password) { public JedisFactory(final String host, final int port,
super(); final int timeout, final String password) {
this.host = host; super();
this.port = port; this.host = host;
this.timeout = (timeout > 0) ? timeout : -1; this.port = port;
this.password = password; this.timeout = (timeout > 0) ? timeout : -1;
} this.password = password;
}
@Override @Override
public Object makeObject() throws Exception { public Object makeObject() throws Exception {
final Jedis jedis; final Jedis jedis;
if (timeout > 0) { if (timeout > 0) {
jedis = new Jedis(this.host, this.port, this.timeout); jedis = new Jedis(this.host, this.port, this.timeout);
} else { } else {
jedis = new Jedis(this.host, this.port); jedis = new Jedis(this.host, this.port);
} }
jedis.connect(); jedis.connect();
if (null != this.password) { if (null != this.password) {
jedis.auth(this.password); jedis.auth(this.password);
} }
return jedis; return jedis;
} }
@Override @Override
public void destroyObject(final Object obj) throws Exception { public void destroyObject(final Object obj) throws Exception {
if(obj instanceof Jedis) { if (obj instanceof Jedis) {
final Jedis jedis = (Jedis) obj; final Jedis jedis = (Jedis) obj;
if (jedis.isConnected()) { if (jedis.isConnected()) {
try { try {
jedis.quit(); jedis.quit();
jedis.disconnect(); jedis.disconnect();
} catch (Exception e) { } catch (Exception e) {
}
}
}
}
@Override }
public boolean validateObject(final Object obj) { }
if(obj instanceof Jedis) { }
final Jedis jedis = (Jedis) obj; }
try {
return jedis.isConnected() && jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
} else {
return false;
}
}
@Override
} public boolean validateObject(final Object obj) {
} if (obj instanceof Jedis) {
final Jedis jedis = (Jedis) obj;
try {
return jedis.isConnected() && jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
} else {
return false;
}
}
}
}

View File

@@ -3,80 +3,95 @@ package redis.clients.jedis;
import java.util.List; import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import redis.clients.util.FixedResourcePool; import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import redis.clients.util.Hashing; import redis.clients.util.Hashing;
import redis.clients.util.Pool;
public class ShardedJedisPool extends FixedResourcePool<ShardedJedis> { public class ShardedJedisPool extends Pool<ShardedJedis> {
private List<JedisShardInfo> shards; public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
private Hashing algo = Hashing.MD5; List<JedisShardInfo> shards) {
private Pattern keyTagPattern; this(poolConfig, shards, Hashing.MD5);
public ShardedJedisPool(List<JedisShardInfo> shards) {
this.shards = shards;
} }
public ShardedJedisPool(List<JedisShardInfo> shards, Hashing algo) { public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
this.shards = shards; List<JedisShardInfo> shards, Hashing algo) {
this.algo = algo; this(poolConfig, shards, algo, null);
} }
public ShardedJedisPool(List<JedisShardInfo> shards, Pattern keyTagPattern) { public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
this.shards = shards; List<JedisShardInfo> shards, Pattern keyTagPattern) {
this.keyTagPattern = keyTagPattern; this(poolConfig, shards, Hashing.MD5, keyTagPattern);
} }
public ShardedJedisPool(List<JedisShardInfo> shards, Hashing algo, public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
Pattern keyTagPattern) { List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
this.shards = shards; super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
this.algo = algo;
this.keyTagPattern = keyTagPattern;
} }
@Override /**
protected ShardedJedis createResource() { * PoolableObjectFactory custom impl.
ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern); */
boolean done = false; private static class ShardedJedisFactory extends BasePoolableObjectFactory {
while (!done) { private List<JedisShardInfo> shards;
try { private Hashing algo;
for (JedisShardInfo shard : jedis.getAllShards()) { private Pattern keyTagPattern;
if (!shard.getResource().isConnected()) {
shard.getResource().connect(); public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo,
Pattern keyTagPattern) {
this.shards = shards;
this.algo = algo;
this.keyTagPattern = keyTagPattern;
}
@Override
public Object makeObject() throws Exception {
ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
boolean done = false;
while (!done) {
try {
for (JedisShardInfo shard : jedis.getAllShards()) {
if (!shard.getResource().isConnected()) {
shard.getResource().connect();
}
}
done = true;
} catch (Exception e) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
} }
} }
done = true; }
} catch (Exception e) { return jedis;
}
@Override
public void destroyObject(final Object obj) throws Exception {
if (obj != null) {
try { try {
Thread.sleep(100); ((ShardedJedis) obj).disconnect();
} catch (InterruptedException e1) { } catch (Exception e) {
} }
} }
} }
return jedis;
}
@Override @Override
protected void destroyResource(ShardedJedis jedis) { public boolean validateObject(final Object obj) {
if (jedis != null) {
try { try {
jedis.disconnect(); ShardedJedis jedis = (ShardedJedis) obj;
} catch (Exception e) { for (JedisShardInfo shard : jedis.getAllShards()) {
if (!shard.getResource().isConnected()
} || !shard.getResource().ping().equals("PONG")) {
} return false;
} }
@Override
protected boolean isResourceValid(ShardedJedis jedis) {
try {
for (JedisShardInfo shard : jedis.getAllShards()) {
if (!shard.getResource().isConnected()
|| !shard.getResource().ping().equals("PONG")) {
return false;
} }
return true;
} catch (Exception ex) {
return false;
} }
return true;
} catch (Exception ex) {
return false;
} }
} }
} }

View File

@@ -0,0 +1,30 @@
package redis.clients.util;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
public abstract class Pool<T> {
private final GenericObjectPool internalPool;
public Pool(final GenericObjectPool.Config poolConfig,
PoolableObjectFactory factory) {
this.internalPool = new GenericObjectPool(factory, poolConfig);
}
@SuppressWarnings("unchecked")
public T getResource() throws Exception {
return (T) internalPool.borrowObject();
}
public void returnResource(final T resource) throws Exception {
internalPool.returnObject(resource);
}
public void returnBrokenResource(final T resource) throws Exception {
internalPool.invalidateObject(resource);
}
public void destroy() throws Exception {
internalPool.close();
}
}

View File

@@ -1,8 +1,9 @@
package redis.clients.jedis.tests; package redis.clients.jedis.tests;
import java.io.IOException; import java.util.NoSuchElementException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@@ -14,12 +15,9 @@ public class JedisPoolTest extends Assert {
private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0); private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0);
@Test @Test
public void checkConnections() throws TimeoutException { public void checkConnections() throws Exception {
JedisPool pool = new JedisPool(hnp.host, hnp.port, 2000); JedisPool pool = new JedisPool(new Config(), hnp.host, hnp.port, 2000);
pool.setResourcesNumber(10); Jedis jedis = pool.getResource();
pool.init();
Jedis jedis = pool.getResource(200);
jedis.auth("foobared"); jedis.auth("foobared");
jedis.set("foo", "bar"); jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo")); assertEquals("bar", jedis.get("foo"));
@@ -28,12 +26,9 @@ public class JedisPoolTest extends Assert {
} }
@Test @Test
public void checkConnectionWithDefaultPort() throws TimeoutException { public void checkConnectionWithDefaultPort() throws Exception {
JedisPool pool = new JedisPool(hnp.host, hnp.port); JedisPool pool = new JedisPool(new Config(), hnp.host, hnp.port);
pool.setResourcesNumber(10); Jedis jedis = pool.getResource();
pool.init();
Jedis jedis = pool.getResource(200);
jedis.auth("foobared"); jedis.auth("foobared");
jedis.set("foo", "bar"); jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo")); assertEquals("bar", jedis.get("foo"));
@@ -42,17 +37,14 @@ public class JedisPoolTest extends Assert {
} }
@Test @Test
public void checkJedisIsReusedWhenReturned() throws TimeoutException { public void checkJedisIsReusedWhenReturned() throws Exception {
JedisPool pool = new JedisPool(hnp.host, hnp.port); JedisPool pool = new JedisPool(new Config(), hnp.host, hnp.port);
pool.setResourcesNumber(1); Jedis jedis = pool.getResource();
pool.init();
Jedis jedis = pool.getResource(200);
jedis.auth("foobared"); jedis.auth("foobared");
jedis.set("foo", "0"); jedis.set("foo", "0");
pool.returnResource(jedis); pool.returnResource(jedis);
jedis = pool.getResource(200); jedis = pool.getResource();
jedis.auth("foobared"); jedis.auth("foobared");
jedis.incr("foo"); jedis.incr("foo");
pool.returnResource(jedis); pool.returnResource(jedis);
@@ -60,35 +52,31 @@ public class JedisPoolTest extends Assert {
} }
@Test @Test
public void checkPoolRepairedWhenJedisIsBroken() throws TimeoutException, public void checkPoolRepairedWhenJedisIsBroken() throws Exception {
IOException { JedisPool pool = new JedisPool(new Config(), hnp.host, hnp.port);
JedisPool pool = new JedisPool(hnp.host, hnp.port); Jedis jedis = pool.getResource();
pool.setResourcesNumber(1);
pool.init();
Jedis jedis = pool.getResource(200);
jedis.auth("foobared"); jedis.auth("foobared");
jedis.quit(); jedis.quit();
pool.returnBrokenResource(jedis); pool.returnBrokenResource(jedis);
jedis = pool.getResource(200); jedis = pool.getResource();
jedis.auth("foobared"); jedis.auth("foobared");
jedis.incr("foo"); jedis.incr("foo");
pool.returnResource(jedis); pool.returnResource(jedis);
pool.destroy(); pool.destroy();
} }
@Test(expected = TimeoutException.class) @Test(expected = NoSuchElementException.class)
public void checkPoolOverflow() throws TimeoutException { public void checkPoolOverflow() throws Exception {
JedisPool pool = new JedisPool(hnp.host, hnp.port); Config config = new Config();
pool.setResourcesNumber(1); config.maxActive = 1;
pool.init(); config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
JedisPool pool = new JedisPool(config, hnp.host, hnp.port);
Jedis jedis = pool.getResource(200); Jedis jedis = pool.getResource();
jedis.auth("foobared"); jedis.auth("foobared");
jedis.set("foo", "0"); jedis.set("foo", "0");
Jedis newJedis = pool.getResource(200); Jedis newJedis = pool.getResource();
newJedis.auth("foobared"); newJedis.auth("foobared");
newJedis.incr("foo"); newJedis.incr("foo");
} }

View File

@@ -4,8 +4,10 @@ import java.io.IOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException; import java.util.NoSuchElementException;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@@ -42,12 +44,9 @@ public class ShardedJedisPoolTest extends Assert {
} }
@Test @Test
public void checkConnections() throws TimeoutException { public void checkConnections() throws Exception {
ShardedJedisPool pool = new ShardedJedisPool(shards); ShardedJedisPool pool = new ShardedJedisPool(new Config(), shards);
pool.setResourcesNumber(10); ShardedJedis jedis = pool.getResource();
pool.init();
ShardedJedis jedis = pool.getResource(200);
jedis.set("foo", "bar"); jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo")); assertEquals("bar", jedis.get("foo"));
pool.returnResource(jedis); pool.returnResource(jedis);
@@ -55,12 +54,9 @@ public class ShardedJedisPoolTest extends Assert {
} }
@Test @Test
public void checkConnectionWithDefaultPort() throws TimeoutException { public void checkConnectionWithDefaultPort() throws Exception {
ShardedJedisPool pool = new ShardedJedisPool(shards); ShardedJedisPool pool = new ShardedJedisPool(new Config(), shards);
pool.setResourcesNumber(1); ShardedJedis jedis = pool.getResource();
pool.init();
ShardedJedis jedis = pool.getResource(200);
jedis.set("foo", "bar"); jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo")); assertEquals("bar", jedis.get("foo"));
pool.returnResource(jedis); pool.returnResource(jedis);
@@ -68,48 +64,43 @@ public class ShardedJedisPoolTest extends Assert {
} }
@Test @Test
public void checkJedisIsReusedWhenReturned() throws TimeoutException { public void checkJedisIsReusedWhenReturned() throws Exception {
ShardedJedisPool pool = new ShardedJedisPool(shards); ShardedJedisPool pool = new ShardedJedisPool(new Config(), shards);
pool.setResourcesNumber(1); ShardedJedis jedis = pool.getResource();
pool.init();
ShardedJedis jedis = pool.getResource(200);
jedis.set("foo", "0"); jedis.set("foo", "0");
pool.returnResource(jedis); pool.returnResource(jedis);
jedis = pool.getResource(200); jedis = pool.getResource();
jedis.incr("foo"); jedis.incr("foo");
pool.returnResource(jedis); pool.returnResource(jedis);
pool.destroy(); pool.destroy();
} }
@Test @Test
public void checkPoolRepairedWhenJedisIsBroken() throws TimeoutException, public void checkPoolRepairedWhenJedisIsBroken() throws Exception {
IOException { ShardedJedisPool pool = new ShardedJedisPool(new Config(), shards);
ShardedJedisPool pool = new ShardedJedisPool(shards); ShardedJedis jedis = pool.getResource();
pool.setResourcesNumber(1);
pool.init();
ShardedJedis jedis = pool.getResource(200);
jedis.disconnect(); jedis.disconnect();
pool.returnBrokenResource(jedis); pool.returnBrokenResource(jedis);
jedis = pool.getResource(200); jedis = pool.getResource();
jedis.incr("foo"); jedis.incr("foo");
pool.returnResource(jedis); pool.returnResource(jedis);
pool.destroy(); pool.destroy();
} }
@Test(expected = TimeoutException.class) @Test(expected = NoSuchElementException.class)
public void checkPoolOverflow() throws TimeoutException { public void checkPoolOverflow() throws Exception {
ShardedJedisPool pool = new ShardedJedisPool(shards); Config config = new Config();
pool.setResourcesNumber(1); config.maxActive = 1;
pool.init(); config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
ShardedJedis jedis = pool.getResource(200); ShardedJedisPool pool = new ShardedJedisPool(config, shards);
ShardedJedis jedis = pool.getResource();
jedis.set("foo", "0"); jedis.set("foo", "0");
ShardedJedis newJedis = pool.getResource(200); ShardedJedis newJedis = pool.getResource();
newJedis.incr("foo"); newJedis.incr("foo");
} }
} }

View File

@@ -1,29 +1,21 @@
package redis.clients.jedis.tests.benchmark; package redis.clients.jedis.tests.benchmark;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger; import org.apache.commons.pool.impl.GenericObjectPool.Config;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPool;
import redis.clients.jedis.tests.HostAndPortUtil; import redis.clients.jedis.tests.HostAndPortUtil;
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort; import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
import redis.clients.util.FixedResourcePool;
public class PoolBenchmark { public class PoolBenchmark {
private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0); private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0);
private static final int TOTAL_OPERATIONS = 100000; private static final int TOTAL_OPERATIONS = 100000;
public static void main(String[] args) throws UnknownHostException, public static void main(String[] args) throws Exception {
IOException, TimeoutException, InterruptedException {
Logger logger = Logger.getLogger(FixedResourcePool.class.getName());
logger.setLevel(Level.OFF);
Jedis j = new Jedis(hnp.host, hnp.port); Jedis j = new Jedis(hnp.host, hnp.port);
j.connect(); j.connect();
j.auth("foobared"); j.auth("foobared");
@@ -37,12 +29,9 @@ public class PoolBenchmark {
System.out.println(((1000 * 2 * TOTAL_OPERATIONS) / elapsed) + " ops"); System.out.println(((1000 * 2 * TOTAL_OPERATIONS) / elapsed) + " ops");
} }
private static void withPool() throws InterruptedException { private static void withPool() throws Exception {
final JedisPool pool = new JedisPool(hnp.host, hnp.port, 2000, final JedisPool pool = new JedisPool(new Config(), hnp.host, hnp.port,
"foobared"); 2000, "foobared");
pool.setResourcesNumber(50);
pool.setDefaultPoolWait(1000000);
pool.init();
List<Thread> tds = new ArrayList<Thread>(); List<Thread> tds = new ArrayList<Thread>();
final AtomicInteger ind = new AtomicInteger(); final AtomicInteger ind = new AtomicInteger();