diff --git a/src/main/java/redis/clients/jedis/JedisShardInfo.java b/src/main/java/redis/clients/jedis/JedisShardInfo.java index f3495ab..232ef93 100644 --- a/src/main/java/redis/clients/jedis/JedisShardInfo.java +++ b/src/main/java/redis/clients/jedis/JedisShardInfo.java @@ -19,10 +19,8 @@ import redis.clients.util.ShardInfo; import redis.clients.util.Sharded; public class JedisShardInfo extends ShardInfo { - @Override public String toString() { - return "JedisShardInfo [host=" + host + ", port=" + port + ", weight=" - + getWeight() + "]"; + return host + ":" + port + "*" + getWeight(); } private int timeout; diff --git a/src/main/java/redis/clients/util/RedisInputStream.java b/src/main/java/redis/clients/util/RedisInputStream.java index ccf52bd..78f5aff 100644 --- a/src/main/java/redis/clients/util/RedisInputStream.java +++ b/src/main/java/redis/clients/util/RedisInputStream.java @@ -86,7 +86,7 @@ public class RedisInputStream extends FilterInputStream { throw new JedisException(e); } String reply = sb.toString(); - if (reply.isEmpty()) { + if (reply.length() == 0) { throw new JedisException( "It seems like server has closed the connection."); } diff --git a/src/main/java/redis/clients/util/Sharded.java b/src/main/java/redis/clients/util/Sharded.java index 5c04dad..0f7849e 100644 --- a/src/main/java/redis/clients/util/Sharded.java +++ b/src/main/java/redis/clients/util/Sharded.java @@ -3,11 +3,10 @@ package redis.clients.util; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.SortedMap; import java.util.TreeMap; -import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -54,23 +53,12 @@ public class Sharded> { private void initialize(List shards) { nodes = new TreeMap(); - int totalWeight = 0; - - for (ShardInfo shard : shards) { - totalWeight += shard.getWeight(); - } - - long oneForthOfStep = (1L << 62) / totalWeight; // 62 vs 64 to normalize - // math in Long - - long floor = Long.MIN_VALUE; for (int i = 0; i != shards.size(); ++i) { final S shardInfo = shards.get(i); + for (int n = 0; n < 160 * shardInfo.getWeight(); n++) { + nodes.put(this.algo.hash(shardInfo.toString() + n), shardInfo); + } resources.put(shardInfo, shardInfo.createResource()); - nodes.put(floor, shardInfo); - floor += 4 * oneForthOfStep * shardInfo.getWeight(); // *4 to - // compensate - // 62 vs 64 } } @@ -83,13 +71,11 @@ public class Sharded> { } private S getShardInfo(byte[] key) { - Iterator> iterator = nodes.headMap(algo.hash(key)) - .entrySet().iterator(); - Entry next = iterator.next(); - if (iterator.hasNext()) { - next = iterator.next(); + SortedMap tail = nodes.tailMap(algo.hash(key)); + if (tail.size() == 0) { + return nodes.get(nodes.firstKey()); } - return next.getValue(); + return tail.get(tail.firstKey()); } public S getShardInfo(String key) { diff --git a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java index e0107e0..82a6f74 100644 --- a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java +++ b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java @@ -9,11 +9,13 @@ import org.junit.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisShardInfo; +import redis.clients.jedis.Protocol; import redis.clients.jedis.ShardedJedis; import redis.clients.jedis.ShardedJedisPipeline; import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort; import redis.clients.util.Hashing; import redis.clients.util.SafeEncoder; +import redis.clients.util.Sharded; public class ShardedJedisTest extends Assert { private static HostAndPort redis1 = HostAndPortUtil.getRedisServers() @@ -27,8 +29,8 @@ public class ShardedJedisTest extends Assert { shards.add(new JedisShardInfo(redis1.host, redis1.port)); shards.add(new JedisShardInfo(redis2.host, redis2.port)); ShardedJedis jedis = new ShardedJedis(shards); - JedisShardInfo s1 = jedis.getShardInfo("a"); - JedisShardInfo s2 = jedis.getShardInfo("b"); + JedisShardInfo s1 = jedis.getShardInfo("a1"); + JedisShardInfo s2 = jedis.getShardInfo("b2"); assertNotSame(s1, s2); } @@ -107,8 +109,8 @@ public class ShardedJedisTest extends Assert { JedisShardInfo s2 = jedis.getShardInfo("foo{bar}"); assertSame(s1, s2); - JedisShardInfo s3 = jedis.getShardInfo("a"); - JedisShardInfo s4 = jedis.getShardInfo("b"); + JedisShardInfo s3 = jedis.getShardInfo("a112"); + JedisShardInfo s4 = jedis.getShardInfo("b112"); assertNotSame(s3, s4); ShardedJedis jedis2 = new ShardedJedis(shards); @@ -130,15 +132,15 @@ public class ShardedJedisTest extends Assert { shards.get(1).setPassword("foobared"); ShardedJedis jedis = new ShardedJedis(shards); - jedis.set("a", "a"); - jedis.set("b", "b"); + jedis.set("a112", "a"); + jedis.set("b112", "b"); - assertTrue(!jedis.getShard("a").equals(jedis.getShard("b"))); + assertNotSame(jedis.getShard("a112"), jedis.getShard("b112")); List results = jedis.pipelined(new ShardedJedisPipeline() { public void execute() { - get("a"); - get("b"); + get("a112"); + get("b112"); } }); @@ -150,4 +152,74 @@ public class ShardedJedisTest extends Assert { assertArrayEquals(SafeEncoder.encode("a"), (byte[]) results.get(0)); assertArrayEquals(SafeEncoder.encode("b"), (byte[]) results.get(1)); } + + @Test + public void testMD5Sharding() throws Exception { + List shards = new ArrayList(3); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT)); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1)); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2)); + Sharded sharded = new Sharded( + shards, Hashing.MD5); + int shard_6379 = 0; + int shard_6380 = 0; + int shard_6381 = 0; + for (int i = 0; i < 1000; i++) { + JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer + .toString(i)); + switch (jedisShardInfo.getPort()) { + case 6379: + shard_6379++; + break; + case 6380: + shard_6380++; + break; + case 6381: + shard_6381++; + break; + default: + fail("Attempting to use a non-defined shard!!:" + + jedisShardInfo); + break; + } + } + assertTrue(shard_6379 > 300 && shard_6379 < 400); + assertTrue(shard_6380 > 300 && shard_6380 < 400); + assertTrue(shard_6381 > 300 && shard_6381 < 400); + } + + @Test + public void testMurmurSharding() throws Exception { + List shards = new ArrayList(3); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT)); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1)); + shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2)); + Sharded sharded = new Sharded( + shards, Hashing.MURMUR_HASH); + int shard_6379 = 0; + int shard_6380 = 0; + int shard_6381 = 0; + for (int i = 0; i < 1000; i++) { + JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer + .toString(i)); + switch (jedisShardInfo.getPort()) { + case 6379: + shard_6379++; + break; + case 6380: + shard_6380++; + break; + case 6381: + shard_6381++; + break; + default: + fail("Attempting to use a non-defined shard!!:" + + jedisShardInfo); + break; + } + } + assertTrue(shard_6379 > 300 && shard_6379 < 400); + assertTrue(shard_6380 > 300 && shard_6380 < 400); + assertTrue(shard_6381 > 300 && shard_6381 < 400); + } } \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/benchmark/ShardedBenchmark.java b/src/test/java/redis/clients/jedis/tests/benchmark/ShardedBenchmark.java index 206bde5..3fb63ff 100644 --- a/src/test/java/redis/clients/jedis/tests/benchmark/ShardedBenchmark.java +++ b/src/test/java/redis/clients/jedis/tests/benchmark/ShardedBenchmark.java @@ -2,49 +2,38 @@ package redis.clients.jedis.tests.benchmark; import java.io.IOException; import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Calendar; -import java.util.Collection; -import java.util.List; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisShardInfo; -import redis.clients.jedis.ShardedJedis; -import redis.clients.jedis.tests.HostAndPortUtil; -import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort; +import redis.clients.util.Hashing; public class ShardedBenchmark { - private static HostAndPort hnp1 = HostAndPortUtil.getRedisServers().get(0); - private static HostAndPort hnp2 = HostAndPortUtil.getRedisServers().get(1); - private static final int TOTAL_OPERATIONS = 100000; + private static final int TOTAL_OPERATIONS = 10000000; public static void main(String[] args) throws UnknownHostException, IOException { - List shards = new ArrayList(); - JedisShardInfo shard = new JedisShardInfo(hnp1.host, hnp1.port); - shard.setPassword("foobared"); - shards.add(shard); - shard = new JedisShardInfo(hnp2.host, hnp2.port); - shard.setPassword("foobared"); - shards.add(shard); - ShardedJedis jedis = new ShardedJedis(shards); - Collection allShards = jedis.getAllShards(); - for (Jedis j : allShards) { - j.flushAll(); - } long begin = Calendar.getInstance().getTimeInMillis(); for (int n = 0; n <= TOTAL_OPERATIONS; n++) { String key = "foo" + n; - jedis.set(key, "bar" + n); - jedis.get(key); + Hashing.MD5.hash(key); } long elapsed = Calendar.getInstance().getTimeInMillis() - begin; - jedis.disconnect(); + System.out.println(((1000 * TOTAL_OPERATIONS) / elapsed) + " MD5 ops"); + + begin = Calendar.getInstance().getTimeInMillis(); + + for (int n = 0; n <= TOTAL_OPERATIONS; n++) { + String key = "foo" + n; + Hashing.MURMUR_HASH.hash(key); + } + + elapsed = Calendar.getInstance().getTimeInMillis() - begin; + + System.out.println(((1000 * TOTAL_OPERATIONS) / elapsed) + + " Murmur ops"); - System.out.println(((1000 * 2 * TOTAL_OPERATIONS) / elapsed) + " ops"); } } \ No newline at end of file