diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index eba466f..9095712 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -1707,9 +1707,9 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, public List multi(final TransactionBlock jedisTransaction) { List results = null; jedisTransaction.setClient(client); - client.multi(); - jedisTransaction.execute(); - results = jedisTransaction.exec(); + client.multi(); + jedisTransaction.execute(); + results = jedisTransaction.exec(); return results; } @@ -1729,8 +1729,10 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, } public void resetState() { - client.resetState(); - client.getAll(); + if (client.isConnected()) { + client.resetState(); + client.getAll(); + } } public String watch(final byte[]... keys) { @@ -1744,7 +1746,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, } @Override - public void close() { + public void close() { client.close(); } diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index e913d68..c50c706 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -11,7 +11,6 @@ import java.util.List; import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisDataException; -import redis.clients.jedis.exceptions.JedisException; import redis.clients.util.RedisInputStream; import redis.clients.util.RedisOutputStream; import redis.clients.util.SafeEncoder; @@ -25,6 +24,8 @@ public class Connection implements Closeable { private int pipelinedCommands = 0; private int timeout = Protocol.DEFAULT_TIMEOUT; + private boolean broken = false; + public Socket getSocket() { return socket; } @@ -45,7 +46,8 @@ public class Connection implements Closeable { socket.setKeepAlive(true); socket.setSoTimeout(0); } catch (SocketException ex) { - throw new JedisException(ex); + broken = true; + throw new JedisConnectionException(ex); } } @@ -54,7 +56,8 @@ public class Connection implements Closeable { socket.setSoTimeout(timeout); socket.setKeepAlive(false); } catch (SocketException ex) { - throw new JedisException(ex); + broken = true; + throw new JedisConnectionException(ex); } } @@ -63,14 +66,6 @@ public class Connection implements Closeable { this.host = host; } - protected void flush() { - try { - outputStream.flush(); - } catch (IOException e) { - throw new JedisConnectionException(e); - } - } - protected Connection sendCommand(final Command cmd, final String... args) { final byte[][] bargs = new byte[args.length][]; for (int i = 0; i < args.length; i++) { @@ -80,17 +75,29 @@ public class Connection implements Closeable { } protected Connection sendCommand(final Command cmd, final byte[]... args) { - connect(); - Protocol.sendCommand(outputStream, cmd, args); - pipelinedCommands++; - return this; + try { + connect(); + Protocol.sendCommand(outputStream, cmd, args); + pipelinedCommands++; + return this; + } catch (JedisConnectionException ex) { + // Any other exceptions related to connection? + broken = true; + throw ex; + } } protected Connection sendCommand(final Command cmd) { - connect(); - Protocol.sendCommand(outputStream, cmd, new byte[0][]); - pipelinedCommands++; - return this; + try { + connect(); + Protocol.sendCommand(outputStream, cmd, new byte[0][]); + pipelinedCommands++; + return this; + } catch (JedisConnectionException ex) { + // Any other exceptions related to connection? + broken = true; + throw ex; + } } public Connection(final String host, final int port) { @@ -139,6 +146,7 @@ public class Connection implements Closeable { outputStream = new RedisOutputStream(socket.getOutputStream()); inputStream = new RedisInputStream(socket.getInputStream()); } catch (IOException ex) { + broken = true; throw new JedisConnectionException(ex); } } @@ -147,7 +155,7 @@ public class Connection implements Closeable { @Override public void close() { disconnect(); - } + } public void disconnect() { if (isConnected()) { @@ -158,6 +166,7 @@ public class Connection implements Closeable { socket.close(); } } catch (IOException ex) { + broken = true; throw new JedisConnectionException(ex); } } @@ -172,7 +181,7 @@ public class Connection implements Closeable { protected String getStatusCodeReply() { flush(); pipelinedCommands--; - final byte[] resp = (byte[]) Protocol.read(inputStream); + final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); if (null == resp) { return null; } else { @@ -192,13 +201,13 @@ public class Connection implements Closeable { public byte[] getBinaryBulkReply() { flush(); pipelinedCommands--; - return (byte[]) Protocol.read(inputStream); + return (byte[]) readProtocolWithCheckingBroken(); } public Long getIntegerReply() { flush(); pipelinedCommands--; - return (Long) Protocol.read(inputStream); + return (Long) readProtocolWithCheckingBroken(); } public List getMultiBulkReply() { @@ -209,29 +218,29 @@ public class Connection implements Closeable { public List getBinaryMultiBulkReply() { flush(); pipelinedCommands--; - return (List) Protocol.read(inputStream); + return (List) readProtocolWithCheckingBroken(); } public void resetPipelinedCount() { - pipelinedCommands = 0; + pipelinedCommands = 0; } @SuppressWarnings("unchecked") public List getRawObjectMultiBulkReply() { - return (List) Protocol.read(inputStream); + return (List) readProtocolWithCheckingBroken(); } public List getObjectMultiBulkReply() { - flush(); - pipelinedCommands--; - return getRawObjectMultiBulkReply(); + flush(); + pipelinedCommands--; + return getRawObjectMultiBulkReply(); } @SuppressWarnings("unchecked") public List getIntegerMultiBulkReply() { flush(); pipelinedCommands--; - return (List) Protocol.read(inputStream); + return (List) readProtocolWithCheckingBroken(); } public List getAll() { @@ -243,7 +252,7 @@ public class Connection implements Closeable { flush(); while (pipelinedCommands > except) { try { - all.add(Protocol.read(inputStream)); + all.add(readProtocolWithCheckingBroken()); } catch (JedisDataException e) { all.add(e); } @@ -255,6 +264,28 @@ public class Connection implements Closeable { public Object getOne() { flush(); pipelinedCommands--; - return Protocol.read(inputStream); + return readProtocolWithCheckingBroken(); + } + + public boolean isBroken() { + return broken; + } + + protected void flush() { + try { + outputStream.flush(); + } catch (IOException ex) { + broken = true; + throw new JedisConnectionException(ex); + } + } + + protected Object readProtocolWithCheckingBroken() { + try { + return Protocol.read(inputStream); + } catch (JedisConnectionException exc) { + broken = true; + throw exc; + } } } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 4961f42..a8aa286 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -12,12 +12,16 @@ import java.util.Map.Entry; import java.util.Set; import redis.clients.jedis.BinaryClient.LIST_POSITION; +import redis.clients.util.Pool; import redis.clients.util.SafeEncoder; import redis.clients.util.Slowlog; public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands { + + protected Pool dataSource = null; + public Jedis(final String host) { super(host); } @@ -3256,11 +3260,11 @@ public class Jedis extends BinaryJedis implements JedisCommands, } return new ScanResult(newcursor, results); } - + public ScanResult scan(final String cursor) { return scan(cursor, new ScanParams()); } - + public ScanResult scan(final String cursor, final ScanParams params) { checkIsInMulti(); client.scan(cursor, params); @@ -3273,12 +3277,12 @@ public class Jedis extends BinaryJedis implements JedisCommands, } return new ScanResult(newcursor, results); } - + public ScanResult> hscan(final String key, final String cursor) { return hscan(key, cursor, new ScanParams()); } - + public ScanResult> hscan(final String key, final String cursor, final ScanParams params) { checkIsInMulti(); @@ -3291,15 +3295,15 @@ public class Jedis extends BinaryJedis implements JedisCommands, while (iterator.hasNext()) { results.add(new AbstractMap.SimpleEntry(SafeEncoder .encode(iterator.next()), SafeEncoder.encode(iterator - .next()))); + .next()))); } return new ScanResult>(newcursor, results); } - + public ScanResult sscan(final String key, final String cursor) { return sscan(key, cursor, new ScanParams()); } - + public ScanResult sscan(final String key, final String cursor, final ScanParams params) { checkIsInMulti(); @@ -3313,11 +3317,11 @@ public class Jedis extends BinaryJedis implements JedisCommands, } return new ScanResult(newcursor, results); } - + public ScanResult zscan(final String key, final String cursor) { return zscan(key, cursor, new ScanParams()); } - + public ScanResult zscan(final String key, final String cursor, final ScanParams params) { checkIsInMulti(); @@ -3412,4 +3416,21 @@ public class Jedis extends BinaryJedis implements JedisCommands, return BuilderFactory.STRING_MAP .build(client.getBinaryMultiBulkReply()); } + + @Override + public void close() { + if (dataSource != null) { + if (client.isBroken()) { + this.dataSource.returnBrokenResource(this); + } else { + this.dataSource.returnResource(this); + } + } else { + client.close(); + } + } + + public void setDataSource(Pool jedisPool) { + this.dataSource = jedisPool; + } } diff --git a/src/main/java/redis/clients/jedis/JedisPool.java b/src/main/java/redis/clients/jedis/JedisPool.java index 6b2c80c..3aaa6ff 100644 --- a/src/main/java/redis/clients/jedis/JedisPool.java +++ b/src/main/java/redis/clients/jedis/JedisPool.java @@ -79,6 +79,13 @@ public class JedisPool extends Pool { database, clientName)); } + @Override + public Jedis getResource() { + Jedis jedis = super.getResource(); + jedis.setDataSource(this); + return jedis; + } + public void returnBrokenResource(final Jedis resource) { returnBrokenResourceObject(resource); } diff --git a/src/main/java/redis/clients/jedis/JedisSentinelPool.java b/src/main/java/redis/clients/jedis/JedisSentinelPool.java index 4fe5433..be8bad1 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelPool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelPool.java @@ -74,15 +74,6 @@ public class JedisSentinelPool extends Pool { initPool(master); } - public void returnBrokenResource(final Jedis resource) { - returnBrokenResourceObject(resource); - } - - public void returnResource(final Jedis resource) { - resource.resetState(); - returnResourceObject(resource); - } - private volatile HostAndPort currentHostMaster; public void destroy() { @@ -171,6 +162,24 @@ public class JedisSentinelPool extends Pool { return new HostAndPort(host, port); } + @Override + public Jedis getResource() { + Jedis jedis = super.getResource(); + jedis.setDataSource(this); + return jedis; + } + + @Override + public void returnBrokenResource(final Jedis resource) { + returnBrokenResourceObject(resource); + } + + @Override + public void returnResource(final Jedis resource) { + resource.resetState(); + returnResourceObject(resource); + } + protected class JedisPubSubAdapter extends JedisPubSub { @Override public void onMessage(String channel, String message) { diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index 7235cfe..0ebc9f0 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -8,8 +9,13 @@ import java.util.regex.Pattern; import redis.clients.jedis.BinaryClient.LIST_POSITION; import redis.clients.util.Hashing; +import redis.clients.util.Pool; + +public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, + Closeable { + + protected Pool dataSource = null; -public class ShardedJedis extends BinaryShardedJedis implements JedisCommands { public ShardedJedis(List shards) { super(shards); } @@ -555,19 +561,53 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands { Jedis j = getShard(key); return j.zscan(key, cursor); } - - public ScanResult> hscan(String key, final String cursor) { + + public ScanResult> hscan(String key, + final String cursor) { Jedis j = getShard(key); return j.hscan(key, cursor); } - + public ScanResult sscan(String key, final String cursor) { Jedis j = getShard(key); return j.sscan(key, cursor); } - + public ScanResult zscan(String key, final String cursor) { Jedis j = getShard(key); return j.zscan(key, cursor); } + + @Override + public void close() { + if (dataSource != null) { + boolean broken = false; + + for (Jedis jedis : getAllShards()) { + if (jedis.getClient().isBroken()) { + broken = true; + } + } + + if (broken) { + dataSource.returnBrokenResource(this); + } else { + this.resetState(); + dataSource.returnResource(this); + } + + } else { + disconnect(); + } + } + + public void setDataSource(Pool shardedJedisPool) { + this.dataSource = shardedJedisPool; + } + + public void resetState() { + for (Jedis jedis : getAllShards()) { + jedis.resetState(); + } + } } diff --git a/src/main/java/redis/clients/jedis/ShardedJedisPool.java b/src/main/java/redis/clients/jedis/ShardedJedisPool.java index dd56ac1..5cdfd06 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedisPool.java +++ b/src/main/java/redis/clients/jedis/ShardedJedisPool.java @@ -31,6 +31,25 @@ public class ShardedJedisPool extends Pool { List shards, Hashing algo, Pattern keyTagPattern) { super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern)); } + + @Override + public ShardedJedis getResource() { + ShardedJedis jedis = super.getResource(); + jedis.setDataSource(this); + return jedis; + } + + @Override + public void returnBrokenResource(final ShardedJedis resource) { + returnBrokenResourceObject(resource); + } + + @Override + public void returnResource(final ShardedJedis resource) { + resource.resetState(); + returnResourceObject(resource); + } + /** * PoolableObjectFactory custom impl. diff --git a/src/test/java/redis/clients/jedis/tests/JedisPoolTest.java b/src/test/java/redis/clients/jedis/tests/JedisPoolTest.java index a501024..7e011c0 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisPoolTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisPoolTest.java @@ -187,15 +187,45 @@ public class JedisPoolTest extends Assert { 2000, "foobared"); Jedis jedis = pool.getResource(); - jedis.set("hello", "jedis"); - Transaction t = jedis.multi(); - t.set("hello", "world"); - pool.returnResource(jedis); + try { + jedis.set("hello", "jedis"); + Transaction t = jedis.multi(); + t.set("hello", "world"); + } finally { + jedis.close(); + } Jedis jedis2 = pool.getResource(); - assertTrue(jedis == jedis2); - assertEquals("jedis", jedis2.get("hello")); - pool.returnResource(jedis2); + try { + assertTrue(jedis == jedis2); + assertEquals("jedis", jedis2.get("hello")); + } finally { + jedis2.close(); + } + pool.destroy(); } -} + + @Test + public void checkResourceIsCloseable() { + GenericObjectPoolConfig config = new GenericObjectPoolConfig(); + config.setMaxTotal(1); + config.setBlockWhenExhausted(false); + JedisPool pool = new JedisPool(config, hnp.getHost(), hnp.getPort(), + 2000, "foobared"); + + Jedis jedis = pool.getResource(); + try { + jedis.set("hello", "jedis"); + } finally { + jedis.close(); + } + + Jedis jedis2 = pool.getResource(); + try { + assertEquals(jedis, jedis2); + } finally { + jedis2.close(); + } + } +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java b/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java index c8df9c5..46c37a8 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java @@ -79,6 +79,29 @@ public class JedisSentinelPoolTest extends JedisTestBase { pool.destroy(); } } + + @Test + public void checkResourceIsCloseable() { + GenericObjectPoolConfig config = new GenericObjectPoolConfig(); + config.setMaxTotal(1); + config.setBlockWhenExhausted(false); + JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, + config, 1000, "foobared", 2); + + Jedis jedis = pool.getResource(); + try { + jedis.set("hello", "jedis"); + } finally { + jedis.close(); + } + + Jedis jedis2 = pool.getResource(); + try { + assertEquals(jedis, jedis2); + } finally { + jedis2.close(); + } + } private void forceFailover(JedisSentinelPool pool) throws InterruptedException { diff --git a/src/test/java/redis/clients/jedis/tests/ShardedJedisPoolTest.java b/src/test/java/redis/clients/jedis/tests/ShardedJedisPoolTest.java index ee7951b..b1350e9 100644 --- a/src/test/java/redis/clients/jedis/tests/ShardedJedisPoolTest.java +++ b/src/test/java/redis/clients/jedis/tests/ShardedJedisPoolTest.java @@ -14,6 +14,7 @@ import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisShardInfo; import redis.clients.jedis.ShardedJedis; +import redis.clients.jedis.ShardedJedisPipeline; import redis.clients.jedis.ShardedJedisPool; import redis.clients.jedis.exceptions.JedisConnectionException; @@ -233,4 +234,69 @@ public class ShardedJedisPoolTest extends Assert { assertEquals("PONG", jedis.ping()); assertEquals("bar", jedis.get("foo")); } + + @Test + public void returnResourceShouldResetState() throws URISyntaxException { + GenericObjectPoolConfig config = new GenericObjectPoolConfig(); + config.setMaxTotal(1); + config.setBlockWhenExhausted(false); + + List shards = new ArrayList(); + shards.add(new JedisShardInfo(new URI( + "redis://:foobared@localhost:6380"))); + shards.add(new JedisShardInfo(new URI( + "redis://:foobared@localhost:6379"))); + + ShardedJedisPool pool = new ShardedJedisPool(config, shards); + + ShardedJedis jedis = pool.getResource(); + jedis.set("pipelined", String.valueOf(0)); + jedis.set("pipelined2", String.valueOf(0)); + + ShardedJedisPipeline pipeline = jedis.pipelined(); + + pipeline.incr("pipelined"); + pipeline.incr("pipelined2"); + + jedis.resetState(); + + pipeline = jedis.pipelined(); + pipeline.incr("pipelined"); + pipeline.incr("pipelined2"); + List results = pipeline.syncAndReturnAll(); + + assertEquals(2, results.size()); + pool.returnResource(jedis); + pool.destroy(); + } + + @Test + public void checkResourceIsCloseable() throws URISyntaxException { + GenericObjectPoolConfig config = new GenericObjectPoolConfig(); + config.setMaxTotal(1); + config.setBlockWhenExhausted(false); + + List shards = new ArrayList(); + shards.add(new JedisShardInfo(new URI( + "redis://:foobared@localhost:6380"))); + shards.add(new JedisShardInfo(new URI( + "redis://:foobared@localhost:6379"))); + + ShardedJedisPool pool = new ShardedJedisPool(config, shards); + + ShardedJedis jedis = pool.getResource(); + try { + jedis.set("hello", "jedis"); + } finally { + jedis.close(); + } + + ShardedJedis jedis2 = pool.getResource(); + try { + assertEquals(jedis, jedis2); + } finally { + jedis2.close(); + } + } + } diff --git a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java index 5a71391..2413952 100644 --- a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java +++ b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java @@ -302,4 +302,25 @@ public class ShardedJedisTest extends Assert { assertEquals(jedisShardInfo.getName(), jedisShardInfo2.getName()); } } + + @Test + public void checkCloseable() { + List shards = new ArrayList(); + shards.add(new JedisShardInfo(redis1.getHost(), redis1.getPort())); + shards.add(new JedisShardInfo(redis2.getHost(), redis2.getPort())); + shards.get(0).setPassword("foobared"); + shards.get(1).setPassword("foobared"); + + ShardedJedis jedisShard = new ShardedJedis(shards); + try { + jedisShard.set("shard_closeable", "true"); + } finally { + jedisShard.close(); + } + + for (Jedis jedis : jedisShard.getAllShards()) { + assertTrue(!jedis.isConnected()); + } + } + } \ No newline at end of file