diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 876cf51..9493fb9 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -61,24 +61,6 @@ public class Connection { } } - protected Object read() { - try { - return protocol.read(inputStream); - } catch (JedisConnectionException e) { - disconnect(); - throw new JedisConnectionException(e); - } - } - - protected void sendProtocolCommand(final Command cmd, final byte[]... args) { - try { - protocol.sendCommand(outputStream, cmd, args); - } catch (JedisConnectionException e) { - disconnect(); - throw new JedisConnectionException(e); - } - } - protected Connection sendCommand(final Command cmd, final String... args) { final byte[][] bargs = new byte[args.length][]; for (int i = 0; i < args.length; i++) { @@ -89,14 +71,14 @@ public class Connection { protected Connection sendCommand(final Command cmd, final byte[]... args) { connect(); - sendProtocolCommand(cmd, args); + protocol.sendCommand(outputStream, cmd, args); pipelinedCommands++; return this; } protected Connection sendCommand(final Command cmd) { connect(); - sendProtocolCommand(cmd, new byte[0][]); + protocol.sendCommand(outputStream, cmd, new byte[0][]); pipelinedCommands++; return this; } @@ -163,7 +145,7 @@ public class Connection { protected String getStatusCodeReply() { flush(); pipelinedCommands--; - final byte[] resp = (byte[]) read(); + final byte[] resp = (byte[]) protocol.read(inputStream); if (null == resp) { return null; } else { @@ -183,13 +165,13 @@ public class Connection { public byte[] getBinaryBulkReply() { flush(); pipelinedCommands--; - return (byte[]) read(); + return (byte[]) protocol.read(inputStream); } public Long getIntegerReply() { flush(); pipelinedCommands--; - return (Long) read(); + return (Long) protocol.read(inputStream); } public List getMultiBulkReply() { @@ -200,14 +182,14 @@ public class Connection { public List getBinaryMultiBulkReply() { flush(); pipelinedCommands--; - return (List) read(); + return (List) protocol.read(inputStream); } @SuppressWarnings("unchecked") public List getObjectMultiBulkReply() { flush(); pipelinedCommands--; - return (List) read(); + return (List) protocol.read(inputStream); } public List getAll() { @@ -218,7 +200,7 @@ public class Connection { List all = new ArrayList(); flush(); while (pipelinedCommands > except) { - all.add(read()); + all.add(protocol.read(inputStream)); pipelinedCommands--; } return all; @@ -227,6 +209,6 @@ public class Connection { public Object getOne() { flush(); pipelinedCommands--; - return read(); + return protocol.read(inputStream); } -} +} \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/JedisShardInfo.java b/src/main/java/redis/clients/jedis/JedisShardInfo.java index 232ef93..46118fa 100644 --- a/src/main/java/redis/clients/jedis/JedisShardInfo.java +++ b/src/main/java/redis/clients/jedis/JedisShardInfo.java @@ -22,11 +22,12 @@ public class JedisShardInfo extends ShardInfo { public String toString() { return host + ":" + port + "*" + getWeight(); } - + private int timeout; private String host; private int port; private String password = null; + private String name = null; public String getHost() { return host; @@ -39,15 +40,27 @@ public class JedisShardInfo extends ShardInfo { public JedisShardInfo(String host) { this(host, Protocol.DEFAULT_PORT); } - + public JedisShardInfo(String host, String name) { + this(host, Protocol.DEFAULT_PORT, name); + } + public JedisShardInfo(String host, int port) { this(host, port, 2000); } - + + public JedisShardInfo(String host, int port, String name) { + this(host, port, 2000, name); + } + public JedisShardInfo(String host, int port, int timeout) { this(host, port, timeout, Sharded.DEFAULT_WEIGHT); } - + + public JedisShardInfo(String host, int port, int timeout, String name) { + this(host, port, timeout, Sharded.DEFAULT_WEIGHT); + this.name = name; + } + public JedisShardInfo(String host, int port, int timeout, int weight) { super(weight); this.host = host; @@ -70,6 +83,10 @@ public class JedisShardInfo extends ShardInfo { public void setTimeout(int timeout) { this.timeout = timeout; } + + public String getName() { + return name ; + } @Override public Jedis createResource() { diff --git a/src/main/java/redis/clients/util/ShardInfo.java b/src/main/java/redis/clients/util/ShardInfo.java index fbb1158..e30389c 100644 --- a/src/main/java/redis/clients/util/ShardInfo.java +++ b/src/main/java/redis/clients/util/ShardInfo.java @@ -15,4 +15,6 @@ public abstract class ShardInfo { } protected abstract T createResource(); + + public abstract String getName(); } diff --git a/src/main/java/redis/clients/util/Sharded.java b/src/main/java/redis/clients/util/Sharded.java index 2e5502d..f6065bc 100644 --- a/src/main/java/redis/clients/util/Sharded.java +++ b/src/main/java/redis/clients/util/Sharded.java @@ -55,9 +55,14 @@ public class Sharded> { for (int i = 0; i != shards.size(); ++i) { final S shardInfo = shards.get(i); - for (int n = 0; n < 160 * shardInfo.getWeight(); n++) { - nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo); - } + if (shardInfo.getName() == null) + for (int n = 0; n < 160 * shardInfo.getWeight(); n++) { + nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo); + } + else + for (int n = 0; n < 160 * shardInfo.getWeight(); n++) { + nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo); + } resources.put(shardInfo, shardInfo.createResource()); } } diff --git a/src/test/java/redis/clients/jedis/tests/ShardedJedisPoolTest.java b/src/test/java/redis/clients/jedis/tests/ShardedJedisPoolTest.java index eeddef6..03763cd 100644 --- a/src/test/java/redis/clients/jedis/tests/ShardedJedisPoolTest.java +++ b/src/test/java/redis/clients/jedis/tests/ShardedJedisPoolTest.java @@ -115,4 +115,72 @@ public class ShardedJedisPoolTest extends Assert { assertNotSame(j1.getShard("foo"), j2.getShard("foo")); } + + public void checkConnectionsWithNoServers() { + shards = new ArrayList(); + shards.add(new JedisShardInfo("localhost", 6379, "ssa")); + shards.add(new JedisShardInfo("localhost", 6380, "ssa")); + Config redisConfig = new Config(); + redisConfig.testOnBorrow = false; // deactivated for now + redisConfig.testOnReturn = true; + redisConfig.maxActive = 200; // nro threads + margen de seguridad? + redisConfig.minIdle = 200; + ShardedJedisPool pool = new ShardedJedisPool(redisConfig, shards); + ShardedJedis jedis = pool.getResource(); + pool.returnResource(jedis); + pool.destroy(); + } + + @Test + public void checkFailedJedisServer() { + ShardedJedisPool pool = new ShardedJedisPool(new Config(), shards); + ShardedJedis jedis = pool.getResource(); + jedis.incr("foo"); + pool.returnResource(jedis); + pool.destroy(); + } + + @Test + public void shouldReturnActiveShardsWhenOneGoesOffline() { + Config redisConfig = new Config(); + redisConfig.testOnBorrow = false; + ShardedJedisPool pool = new ShardedJedisPool(redisConfig, shards); + ShardedJedis jedis = pool.getResource(); + // fill the shards + for (int i = 0; i < 1000; i++) { + jedis.set("a-test-" + i, "0"); + } + pool.returnResource(jedis); + // check quantity for each shard + Jedis j = new Jedis(shards.get(0)); + j.connect(); + Long c1 = j.dbSize(); + j.disconnect(); + j = new Jedis(shards.get(1)); + j.connect(); + Long c2 = j.dbSize(); + j.disconnect(); + // shutdown shard 2 and check thay the pool returns an instance with c1 + // items on one shard + // alter shard 1 and recreate pool + pool.destroy(); + shards.set(1, new JedisShardInfo("nohost", 1234)); + pool = new ShardedJedisPool(redisConfig, shards); + jedis = pool.getResource(); + Long actual = new Long(0); + Long fails = new Long(0); + for (int i = 0; i < 1000; i++) { + try { + jedis.get("a-test-" + i); + actual++; + } catch (RuntimeException e) { + fails++; + } + } + pool.returnResource(jedis); + pool.destroy(); + assertEquals(actual, c1); + assertEquals(fails, c2); + } + } \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java index 3f41c00..5d73696 100644 --- a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java +++ b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java @@ -240,5 +240,63 @@ public class ShardedJedisTest extends Assert { assertTrue(shard_6380 > 300 && shard_6380 < 400); assertTrue(shard_6381 > 300 && shard_6381 < 400); } + + @Test + public void testMasterSlaveShardingConsistency() { + List shards = new ArrayList(3); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT)); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1)); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2)); + Sharded sharded = new Sharded( + shards, Hashing.MURMUR_HASH); + + List otherShards = new ArrayList(3); + otherShards.add(new JedisShardInfo("otherhost", Protocol.DEFAULT_PORT)); + otherShards.add(new JedisShardInfo("otherhost", + Protocol.DEFAULT_PORT + 1)); + otherShards.add(new JedisShardInfo("otherhost", + Protocol.DEFAULT_PORT + 2)); + Sharded sharded2 = new Sharded( + otherShards, Hashing.MURMUR_HASH); + + for (int i = 0; i < 1000; i++) { + JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer + .toString(i)); + JedisShardInfo jedisShardInfo2 = sharded2.getShardInfo(Integer + .toString(i)); + assertEquals(shards.indexOf(jedisShardInfo), + otherShards.indexOf(jedisShardInfo2)); + } + + } + @Test + public void testMasterSlaveShardingConsistencyWithShardNaming() { + List shards = new ArrayList(3); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT, "HOST1:1234")); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1,"HOST2:1234")); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2, "HOST3:1234")); + Sharded sharded = new Sharded( + shards, Hashing.MURMUR_HASH); + + List otherShards = new ArrayList(3); + otherShards.add(new JedisShardInfo("otherhost", Protocol.DEFAULT_PORT, "HOST2:1234")); + otherShards.add(new JedisShardInfo("otherhost", + Protocol.DEFAULT_PORT + 1, "HOST3:1234")); + otherShards.add(new JedisShardInfo("otherhost", + Protocol.DEFAULT_PORT + 2, "HOST1:1234")); + Sharded sharded2 = new Sharded( + otherShards, Hashing.MURMUR_HASH); + + for (int i = 0; i < 1000; i++) { + JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer + .toString(i)); + JedisShardInfo jedisShardInfo2 = sharded2.getShardInfo(Integer + .toString(i)); + assertEquals(jedisShardInfo.getName(), + jedisShardInfo2.getName()); + } + + } + }