Implements Closeable to Pooled Jedis & ShardedJedis

* Implement Closeable from Jedis, ShardedJedis with Pooled
** resources from JedisPool, JedisSentinelPool, ShardedJedis, ShardedJedisPool
* Connection class : check whether Jedis Connection is broken
** when it's time to throw JedisConnectionException, mark Connection to broken
This commit is contained in:
Jungtaek Lim
2014-02-23 23:50:40 +09:00
parent e9cf469200
commit 670e019a89
11 changed files with 338 additions and 69 deletions

View File

@@ -1707,9 +1707,9 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
public List<Object> multi(final TransactionBlock jedisTransaction) { public List<Object> multi(final TransactionBlock jedisTransaction) {
List<Object> results = null; List<Object> results = null;
jedisTransaction.setClient(client); jedisTransaction.setClient(client);
client.multi(); client.multi();
jedisTransaction.execute(); jedisTransaction.execute();
results = jedisTransaction.exec(); results = jedisTransaction.exec();
return results; return results;
} }
@@ -1729,8 +1729,10 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
} }
public void resetState() { public void resetState() {
client.resetState(); if (client.isConnected()) {
client.getAll(); client.resetState();
client.getAll();
}
} }
public String watch(final byte[]... keys) { public String watch(final byte[]... keys) {
@@ -1744,7 +1746,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
} }
@Override @Override
public void close() { public void close() {
client.close(); client.close();
} }

View File

@@ -11,7 +11,6 @@ import java.util.List;
import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.RedisInputStream; import redis.clients.util.RedisInputStream;
import redis.clients.util.RedisOutputStream; import redis.clients.util.RedisOutputStream;
import redis.clients.util.SafeEncoder; import redis.clients.util.SafeEncoder;
@@ -25,6 +24,8 @@ public class Connection implements Closeable {
private int pipelinedCommands = 0; private int pipelinedCommands = 0;
private int timeout = Protocol.DEFAULT_TIMEOUT; private int timeout = Protocol.DEFAULT_TIMEOUT;
private boolean broken = false;
public Socket getSocket() { public Socket getSocket() {
return socket; return socket;
} }
@@ -45,7 +46,8 @@ public class Connection implements Closeable {
socket.setKeepAlive(true); socket.setKeepAlive(true);
socket.setSoTimeout(0); socket.setSoTimeout(0);
} catch (SocketException ex) { } catch (SocketException ex) {
throw new JedisException(ex); broken = true;
throw new JedisConnectionException(ex);
} }
} }
@@ -54,7 +56,8 @@ public class Connection implements Closeable {
socket.setSoTimeout(timeout); socket.setSoTimeout(timeout);
socket.setKeepAlive(false); socket.setKeepAlive(false);
} catch (SocketException ex) { } catch (SocketException ex) {
throw new JedisException(ex); broken = true;
throw new JedisConnectionException(ex);
} }
} }
@@ -63,14 +66,6 @@ public class Connection implements Closeable {
this.host = host; this.host = host;
} }
protected void flush() {
try {
outputStream.flush();
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
protected Connection sendCommand(final Command cmd, final String... args) { protected Connection sendCommand(final Command cmd, final String... args) {
final byte[][] bargs = new byte[args.length][]; final byte[][] bargs = new byte[args.length][];
for (int i = 0; i < args.length; i++) { for (int i = 0; i < args.length; i++) {
@@ -80,17 +75,29 @@ public class Connection implements Closeable {
} }
protected Connection sendCommand(final Command cmd, final byte[]... args) { protected Connection sendCommand(final Command cmd, final byte[]... args) {
connect(); try {
Protocol.sendCommand(outputStream, cmd, args); connect();
pipelinedCommands++; Protocol.sendCommand(outputStream, cmd, args);
return this; pipelinedCommands++;
return this;
} catch (JedisConnectionException ex) {
// Any other exceptions related to connection?
broken = true;
throw ex;
}
} }
protected Connection sendCommand(final Command cmd) { protected Connection sendCommand(final Command cmd) {
connect(); try {
Protocol.sendCommand(outputStream, cmd, new byte[0][]); connect();
pipelinedCommands++; Protocol.sendCommand(outputStream, cmd, new byte[0][]);
return this; pipelinedCommands++;
return this;
} catch (JedisConnectionException ex) {
// Any other exceptions related to connection?
broken = true;
throw ex;
}
} }
public Connection(final String host, final int port) { public Connection(final String host, final int port) {
@@ -139,6 +146,7 @@ public class Connection implements Closeable {
outputStream = new RedisOutputStream(socket.getOutputStream()); outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream()); inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) { } catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex); throw new JedisConnectionException(ex);
} }
} }
@@ -147,7 +155,7 @@ public class Connection implements Closeable {
@Override @Override
public void close() { public void close() {
disconnect(); disconnect();
} }
public void disconnect() { public void disconnect() {
if (isConnected()) { if (isConnected()) {
@@ -158,6 +166,7 @@ public class Connection implements Closeable {
socket.close(); socket.close();
} }
} catch (IOException ex) { } catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex); throw new JedisConnectionException(ex);
} }
} }
@@ -172,7 +181,7 @@ public class Connection implements Closeable {
protected String getStatusCodeReply() { protected String getStatusCodeReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
final byte[] resp = (byte[]) Protocol.read(inputStream); final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
if (null == resp) { if (null == resp) {
return null; return null;
} else { } else {
@@ -192,13 +201,13 @@ public class Connection implements Closeable {
public byte[] getBinaryBulkReply() { public byte[] getBinaryBulkReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return (byte[]) Protocol.read(inputStream); return (byte[]) readProtocolWithCheckingBroken();
} }
public Long getIntegerReply() { public Long getIntegerReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return (Long) Protocol.read(inputStream); return (Long) readProtocolWithCheckingBroken();
} }
public List<String> getMultiBulkReply() { public List<String> getMultiBulkReply() {
@@ -209,29 +218,29 @@ public class Connection implements Closeable {
public List<byte[]> getBinaryMultiBulkReply() { public List<byte[]> getBinaryMultiBulkReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return (List<byte[]>) Protocol.read(inputStream); return (List<byte[]>) readProtocolWithCheckingBroken();
} }
public void resetPipelinedCount() { public void resetPipelinedCount() {
pipelinedCommands = 0; pipelinedCommands = 0;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Object> getRawObjectMultiBulkReply() { public List<Object> getRawObjectMultiBulkReply() {
return (List<Object>) Protocol.read(inputStream); return (List<Object>) readProtocolWithCheckingBroken();
} }
public List<Object> getObjectMultiBulkReply() { public List<Object> getObjectMultiBulkReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return getRawObjectMultiBulkReply(); return getRawObjectMultiBulkReply();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Long> getIntegerMultiBulkReply() { public List<Long> getIntegerMultiBulkReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return (List<Long>) Protocol.read(inputStream); return (List<Long>) readProtocolWithCheckingBroken();
} }
public List<Object> getAll() { public List<Object> getAll() {
@@ -243,7 +252,7 @@ public class Connection implements Closeable {
flush(); flush();
while (pipelinedCommands > except) { while (pipelinedCommands > except) {
try { try {
all.add(Protocol.read(inputStream)); all.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) { } catch (JedisDataException e) {
all.add(e); all.add(e);
} }
@@ -255,6 +264,28 @@ public class Connection implements Closeable {
public Object getOne() { public Object getOne() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return Protocol.read(inputStream); return readProtocolWithCheckingBroken();
}
public boolean isBroken() {
return broken;
}
protected void flush() {
try {
outputStream.flush();
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
protected Object readProtocolWithCheckingBroken() {
try {
return Protocol.read(inputStream);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
} }
} }

View File

@@ -12,12 +12,16 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import redis.clients.jedis.BinaryClient.LIST_POSITION; import redis.clients.jedis.BinaryClient.LIST_POSITION;
import redis.clients.util.Pool;
import redis.clients.util.SafeEncoder; import redis.clients.util.SafeEncoder;
import redis.clients.util.Slowlog; import redis.clients.util.Slowlog;
public class Jedis extends BinaryJedis implements JedisCommands, public class Jedis extends BinaryJedis implements JedisCommands,
MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands,
BasicCommands, ClusterCommands { BasicCommands, ClusterCommands {
protected Pool<Jedis> dataSource = null;
public Jedis(final String host) { public Jedis(final String host) {
super(host); super(host);
} }
@@ -3291,7 +3295,7 @@ public class Jedis extends BinaryJedis implements JedisCommands,
while (iterator.hasNext()) { while (iterator.hasNext()) {
results.add(new AbstractMap.SimpleEntry<String, String>(SafeEncoder results.add(new AbstractMap.SimpleEntry<String, String>(SafeEncoder
.encode(iterator.next()), SafeEncoder.encode(iterator .encode(iterator.next()), SafeEncoder.encode(iterator
.next()))); .next())));
} }
return new ScanResult<Map.Entry<String, String>>(newcursor, results); return new ScanResult<Map.Entry<String, String>>(newcursor, results);
} }
@@ -3412,4 +3416,21 @@ public class Jedis extends BinaryJedis implements JedisCommands,
return BuilderFactory.STRING_MAP return BuilderFactory.STRING_MAP
.build(client.getBinaryMultiBulkReply()); .build(client.getBinaryMultiBulkReply());
} }
@Override
public void close() {
if (dataSource != null) {
if (client.isBroken()) {
this.dataSource.returnBrokenResource(this);
} else {
this.dataSource.returnResource(this);
}
} else {
client.close();
}
}
public void setDataSource(Pool<Jedis> jedisPool) {
this.dataSource = jedisPool;
}
} }

View File

@@ -79,6 +79,13 @@ public class JedisPool extends Pool<Jedis> {
database, clientName)); database, clientName));
} }
@Override
public Jedis getResource() {
Jedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
public void returnBrokenResource(final Jedis resource) { public void returnBrokenResource(final Jedis resource) {
returnBrokenResourceObject(resource); returnBrokenResourceObject(resource);
} }

View File

@@ -74,15 +74,6 @@ public class JedisSentinelPool extends Pool<Jedis> {
initPool(master); initPool(master);
} }
public void returnBrokenResource(final Jedis resource) {
returnBrokenResourceObject(resource);
}
public void returnResource(final Jedis resource) {
resource.resetState();
returnResourceObject(resource);
}
private volatile HostAndPort currentHostMaster; private volatile HostAndPort currentHostMaster;
public void destroy() { public void destroy() {
@@ -171,6 +162,24 @@ public class JedisSentinelPool extends Pool<Jedis> {
return new HostAndPort(host, port); return new HostAndPort(host, port);
} }
@Override
public Jedis getResource() {
Jedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
@Override
public void returnBrokenResource(final Jedis resource) {
returnBrokenResourceObject(resource);
}
@Override
public void returnResource(final Jedis resource) {
resource.resetState();
returnResourceObject(resource);
}
protected class JedisPubSubAdapter extends JedisPubSub { protected class JedisPubSubAdapter extends JedisPubSub {
@Override @Override
public void onMessage(String channel, String message) { public void onMessage(String channel, String message) {

View File

@@ -1,5 +1,6 @@
package redis.clients.jedis; package redis.clients.jedis;
import java.io.Closeable;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@@ -8,8 +9,13 @@ import java.util.regex.Pattern;
import redis.clients.jedis.BinaryClient.LIST_POSITION; import redis.clients.jedis.BinaryClient.LIST_POSITION;
import redis.clients.util.Hashing; import redis.clients.util.Hashing;
import redis.clients.util.Pool;
public class ShardedJedis extends BinaryShardedJedis implements JedisCommands,
Closeable {
protected Pool<ShardedJedis> dataSource = null;
public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
public ShardedJedis(List<JedisShardInfo> shards) { public ShardedJedis(List<JedisShardInfo> shards) {
super(shards); super(shards);
} }
@@ -556,7 +562,8 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
return j.zscan(key, cursor); return j.zscan(key, cursor);
} }
public ScanResult<Entry<String, String>> hscan(String key, final String cursor) { public ScanResult<Entry<String, String>> hscan(String key,
final String cursor) {
Jedis j = getShard(key); Jedis j = getShard(key);
return j.hscan(key, cursor); return j.hscan(key, cursor);
} }
@@ -570,4 +577,37 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
Jedis j = getShard(key); Jedis j = getShard(key);
return j.zscan(key, cursor); return j.zscan(key, cursor);
} }
@Override
public void close() {
if (dataSource != null) {
boolean broken = false;
for (Jedis jedis : getAllShards()) {
if (jedis.getClient().isBroken()) {
broken = true;
}
}
if (broken) {
dataSource.returnBrokenResource(this);
} else {
this.resetState();
dataSource.returnResource(this);
}
} else {
disconnect();
}
}
public void setDataSource(Pool<ShardedJedis> shardedJedisPool) {
this.dataSource = shardedJedisPool;
}
public void resetState() {
for (Jedis jedis : getAllShards()) {
jedis.resetState();
}
}
} }

View File

@@ -32,6 +32,25 @@ public class ShardedJedisPool extends Pool<ShardedJedis> {
super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern)); super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
} }
@Override
public ShardedJedis getResource() {
ShardedJedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
@Override
public void returnBrokenResource(final ShardedJedis resource) {
returnBrokenResourceObject(resource);
}
@Override
public void returnResource(final ShardedJedis resource) {
resource.resetState();
returnResourceObject(resource);
}
/** /**
* PoolableObjectFactory custom impl. * PoolableObjectFactory custom impl.
*/ */

View File

@@ -187,15 +187,45 @@ public class JedisPoolTest extends Assert {
2000, "foobared"); 2000, "foobared");
Jedis jedis = pool.getResource(); Jedis jedis = pool.getResource();
jedis.set("hello", "jedis"); try {
Transaction t = jedis.multi(); jedis.set("hello", "jedis");
t.set("hello", "world"); Transaction t = jedis.multi();
pool.returnResource(jedis); t.set("hello", "world");
} finally {
jedis.close();
}
Jedis jedis2 = pool.getResource(); Jedis jedis2 = pool.getResource();
assertTrue(jedis == jedis2); try {
assertEquals("jedis", jedis2.get("hello")); assertTrue(jedis == jedis2);
pool.returnResource(jedis2); assertEquals("jedis", jedis2.get("hello"));
} finally {
jedis2.close();
}
pool.destroy(); pool.destroy();
} }
@Test
public void checkResourceIsCloseable() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(1);
config.setBlockWhenExhausted(false);
JedisPool pool = new JedisPool(config, hnp.getHost(), hnp.getPort(),
2000, "foobared");
Jedis jedis = pool.getResource();
try {
jedis.set("hello", "jedis");
} finally {
jedis.close();
}
Jedis jedis2 = pool.getResource();
try {
assertEquals(jedis, jedis2);
} finally {
jedis2.close();
}
}
} }

View File

@@ -80,6 +80,29 @@ public class JedisSentinelPoolTest extends JedisTestBase {
} }
} }
@Test
public void checkResourceIsCloseable() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(1);
config.setBlockWhenExhausted(false);
JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels,
config, 1000, "foobared", 2);
Jedis jedis = pool.getResource();
try {
jedis.set("hello", "jedis");
} finally {
jedis.close();
}
Jedis jedis2 = pool.getResource();
try {
assertEquals(jedis, jedis2);
} finally {
jedis2.close();
}
}
private void forceFailover(JedisSentinelPool pool) private void forceFailover(JedisSentinelPool pool)
throws InterruptedException { throws InterruptedException {
HostAndPort oldMaster = pool.getCurrentHostMaster(); HostAndPort oldMaster = pool.getCurrentHostMaster();

View File

@@ -14,6 +14,7 @@ import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisShardInfo; import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis; import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPipeline;
import redis.clients.jedis.ShardedJedisPool; import redis.clients.jedis.ShardedJedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisConnectionException;
@@ -233,4 +234,69 @@ public class ShardedJedisPoolTest extends Assert {
assertEquals("PONG", jedis.ping()); assertEquals("PONG", jedis.ping());
assertEquals("bar", jedis.get("foo")); assertEquals("bar", jedis.get("foo"));
} }
@Test
public void returnResourceShouldResetState() throws URISyntaxException {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(1);
config.setBlockWhenExhausted(false);
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
shards.add(new JedisShardInfo(new URI(
"redis://:foobared@localhost:6380")));
shards.add(new JedisShardInfo(new URI(
"redis://:foobared@localhost:6379")));
ShardedJedisPool pool = new ShardedJedisPool(config, shards);
ShardedJedis jedis = pool.getResource();
jedis.set("pipelined", String.valueOf(0));
jedis.set("pipelined2", String.valueOf(0));
ShardedJedisPipeline pipeline = jedis.pipelined();
pipeline.incr("pipelined");
pipeline.incr("pipelined2");
jedis.resetState();
pipeline = jedis.pipelined();
pipeline.incr("pipelined");
pipeline.incr("pipelined2");
List<Object> results = pipeline.syncAndReturnAll();
assertEquals(2, results.size());
pool.returnResource(jedis);
pool.destroy();
}
@Test
public void checkResourceIsCloseable() throws URISyntaxException {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(1);
config.setBlockWhenExhausted(false);
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
shards.add(new JedisShardInfo(new URI(
"redis://:foobared@localhost:6380")));
shards.add(new JedisShardInfo(new URI(
"redis://:foobared@localhost:6379")));
ShardedJedisPool pool = new ShardedJedisPool(config, shards);
ShardedJedis jedis = pool.getResource();
try {
jedis.set("hello", "jedis");
} finally {
jedis.close();
}
ShardedJedis jedis2 = pool.getResource();
try {
assertEquals(jedis, jedis2);
} finally {
jedis2.close();
}
}
} }

View File

@@ -302,4 +302,25 @@ public class ShardedJedisTest extends Assert {
assertEquals(jedisShardInfo.getName(), jedisShardInfo2.getName()); assertEquals(jedisShardInfo.getName(), jedisShardInfo2.getName());
} }
} }
@Test
public void checkCloseable() {
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
shards.add(new JedisShardInfo(redis1.getHost(), redis1.getPort()));
shards.add(new JedisShardInfo(redis2.getHost(), redis2.getPort()));
shards.get(0).setPassword("foobared");
shards.get(1).setPassword("foobared");
ShardedJedis jedisShard = new ShardedJedis(shards);
try {
jedisShard.set("shard_closeable", "true");
} finally {
jedisShard.close();
}
for (Jedis jedis : jedisShard.getAllShards()) {
assertTrue(!jedis.isConnected());
}
}
} }