fix consistent hashing, both MD5 and Murmur,
This commit is contained in:
@@ -19,10 +19,8 @@ import redis.clients.util.ShardInfo;
|
|||||||
import redis.clients.util.Sharded;
|
import redis.clients.util.Sharded;
|
||||||
|
|
||||||
public class JedisShardInfo extends ShardInfo<Jedis> {
|
public class JedisShardInfo extends ShardInfo<Jedis> {
|
||||||
@Override
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "JedisShardInfo [host=" + host + ", port=" + port + ", weight="
|
return host + ":" + port + "*" + getWeight();
|
||||||
+ getWeight() + "]";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private int timeout;
|
private int timeout;
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ public class RedisInputStream extends FilterInputStream {
|
|||||||
throw new JedisException(e);
|
throw new JedisException(e);
|
||||||
}
|
}
|
||||||
String reply = sb.toString();
|
String reply = sb.toString();
|
||||||
if (reply.isEmpty()) {
|
if (reply.length() == 0) {
|
||||||
throw new JedisException(
|
throw new JedisException(
|
||||||
"It seems like server has closed the connection.");
|
"It seems like server has closed the connection.");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,11 +3,10 @@ package redis.clients.util;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.SortedMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
@@ -54,23 +53,12 @@ public class Sharded<R, S extends ShardInfo<R>> {
|
|||||||
private void initialize(List<S> shards) {
|
private void initialize(List<S> shards) {
|
||||||
nodes = new TreeMap<Long, S>();
|
nodes = new TreeMap<Long, S>();
|
||||||
|
|
||||||
int totalWeight = 0;
|
|
||||||
|
|
||||||
for (ShardInfo<?> shard : shards) {
|
|
||||||
totalWeight += shard.getWeight();
|
|
||||||
}
|
|
||||||
|
|
||||||
long oneForthOfStep = (1L << 62) / totalWeight; // 62 vs 64 to normalize
|
|
||||||
// math in Long
|
|
||||||
|
|
||||||
long floor = Long.MIN_VALUE;
|
|
||||||
for (int i = 0; i != shards.size(); ++i) {
|
for (int i = 0; i != shards.size(); ++i) {
|
||||||
final S shardInfo = shards.get(i);
|
final S shardInfo = shards.get(i);
|
||||||
|
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
|
||||||
|
nodes.put(this.algo.hash(shardInfo.toString() + n), shardInfo);
|
||||||
|
}
|
||||||
resources.put(shardInfo, shardInfo.createResource());
|
resources.put(shardInfo, shardInfo.createResource());
|
||||||
nodes.put(floor, shardInfo);
|
|
||||||
floor += 4 * oneForthOfStep * shardInfo.getWeight(); // *4 to
|
|
||||||
// compensate
|
|
||||||
// 62 vs 64
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -83,13 +71,11 @@ public class Sharded<R, S extends ShardInfo<R>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private S getShardInfo(byte[] key) {
|
private S getShardInfo(byte[] key) {
|
||||||
Iterator<Entry<Long, S>> iterator = nodes.headMap(algo.hash(key))
|
SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
|
||||||
.entrySet().iterator();
|
if (tail.size() == 0) {
|
||||||
Entry<Long, S> next = iterator.next();
|
return nodes.get(nodes.firstKey());
|
||||||
if (iterator.hasNext()) {
|
|
||||||
next = iterator.next();
|
|
||||||
}
|
}
|
||||||
return next.getValue();
|
return tail.get(tail.firstKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
public S getShardInfo(String key) {
|
public S getShardInfo(String key) {
|
||||||
|
|||||||
@@ -9,11 +9,13 @@ import org.junit.Test;
|
|||||||
|
|
||||||
import redis.clients.jedis.Jedis;
|
import redis.clients.jedis.Jedis;
|
||||||
import redis.clients.jedis.JedisShardInfo;
|
import redis.clients.jedis.JedisShardInfo;
|
||||||
|
import redis.clients.jedis.Protocol;
|
||||||
import redis.clients.jedis.ShardedJedis;
|
import redis.clients.jedis.ShardedJedis;
|
||||||
import redis.clients.jedis.ShardedJedisPipeline;
|
import redis.clients.jedis.ShardedJedisPipeline;
|
||||||
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
|
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
|
||||||
import redis.clients.util.Hashing;
|
import redis.clients.util.Hashing;
|
||||||
import redis.clients.util.SafeEncoder;
|
import redis.clients.util.SafeEncoder;
|
||||||
|
import redis.clients.util.Sharded;
|
||||||
|
|
||||||
public class ShardedJedisTest extends Assert {
|
public class ShardedJedisTest extends Assert {
|
||||||
private static HostAndPort redis1 = HostAndPortUtil.getRedisServers()
|
private static HostAndPort redis1 = HostAndPortUtil.getRedisServers()
|
||||||
@@ -27,8 +29,8 @@ public class ShardedJedisTest extends Assert {
|
|||||||
shards.add(new JedisShardInfo(redis1.host, redis1.port));
|
shards.add(new JedisShardInfo(redis1.host, redis1.port));
|
||||||
shards.add(new JedisShardInfo(redis2.host, redis2.port));
|
shards.add(new JedisShardInfo(redis2.host, redis2.port));
|
||||||
ShardedJedis jedis = new ShardedJedis(shards);
|
ShardedJedis jedis = new ShardedJedis(shards);
|
||||||
JedisShardInfo s1 = jedis.getShardInfo("a");
|
JedisShardInfo s1 = jedis.getShardInfo("a1");
|
||||||
JedisShardInfo s2 = jedis.getShardInfo("b");
|
JedisShardInfo s2 = jedis.getShardInfo("b2");
|
||||||
assertNotSame(s1, s2);
|
assertNotSame(s1, s2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -107,8 +109,8 @@ public class ShardedJedisTest extends Assert {
|
|||||||
JedisShardInfo s2 = jedis.getShardInfo("foo{bar}");
|
JedisShardInfo s2 = jedis.getShardInfo("foo{bar}");
|
||||||
assertSame(s1, s2);
|
assertSame(s1, s2);
|
||||||
|
|
||||||
JedisShardInfo s3 = jedis.getShardInfo("a");
|
JedisShardInfo s3 = jedis.getShardInfo("a112");
|
||||||
JedisShardInfo s4 = jedis.getShardInfo("b");
|
JedisShardInfo s4 = jedis.getShardInfo("b112");
|
||||||
assertNotSame(s3, s4);
|
assertNotSame(s3, s4);
|
||||||
|
|
||||||
ShardedJedis jedis2 = new ShardedJedis(shards);
|
ShardedJedis jedis2 = new ShardedJedis(shards);
|
||||||
@@ -130,15 +132,15 @@ public class ShardedJedisTest extends Assert {
|
|||||||
shards.get(1).setPassword("foobared");
|
shards.get(1).setPassword("foobared");
|
||||||
ShardedJedis jedis = new ShardedJedis(shards);
|
ShardedJedis jedis = new ShardedJedis(shards);
|
||||||
|
|
||||||
jedis.set("a", "a");
|
jedis.set("a112", "a");
|
||||||
jedis.set("b", "b");
|
jedis.set("b112", "b");
|
||||||
|
|
||||||
assertTrue(!jedis.getShard("a").equals(jedis.getShard("b")));
|
assertNotSame(jedis.getShard("a112"), jedis.getShard("b112"));
|
||||||
|
|
||||||
List<Object> results = jedis.pipelined(new ShardedJedisPipeline() {
|
List<Object> results = jedis.pipelined(new ShardedJedisPipeline() {
|
||||||
public void execute() {
|
public void execute() {
|
||||||
get("a");
|
get("a112");
|
||||||
get("b");
|
get("b112");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -150,4 +152,74 @@ public class ShardedJedisTest extends Assert {
|
|||||||
assertArrayEquals(SafeEncoder.encode("a"), (byte[]) results.get(0));
|
assertArrayEquals(SafeEncoder.encode("a"), (byte[]) results.get(0));
|
||||||
assertArrayEquals(SafeEncoder.encode("b"), (byte[]) results.get(1));
|
assertArrayEquals(SafeEncoder.encode("b"), (byte[]) results.get(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMD5Sharding() throws Exception {
|
||||||
|
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(3);
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT));
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1));
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2));
|
||||||
|
Sharded<Jedis, JedisShardInfo> sharded = new Sharded<Jedis, JedisShardInfo>(
|
||||||
|
shards, Hashing.MD5);
|
||||||
|
int shard_6379 = 0;
|
||||||
|
int shard_6380 = 0;
|
||||||
|
int shard_6381 = 0;
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer
|
||||||
|
.toString(i));
|
||||||
|
switch (jedisShardInfo.getPort()) {
|
||||||
|
case 6379:
|
||||||
|
shard_6379++;
|
||||||
|
break;
|
||||||
|
case 6380:
|
||||||
|
shard_6380++;
|
||||||
|
break;
|
||||||
|
case 6381:
|
||||||
|
shard_6381++;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
fail("Attempting to use a non-defined shard!!:"
|
||||||
|
+ jedisShardInfo);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(shard_6379 > 300 && shard_6379 < 400);
|
||||||
|
assertTrue(shard_6380 > 300 && shard_6380 < 400);
|
||||||
|
assertTrue(shard_6381 > 300 && shard_6381 < 400);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMurmurSharding() throws Exception {
|
||||||
|
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(3);
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT));
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1));
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2));
|
||||||
|
Sharded<Jedis, JedisShardInfo> sharded = new Sharded<Jedis, JedisShardInfo>(
|
||||||
|
shards, Hashing.MURMUR_HASH);
|
||||||
|
int shard_6379 = 0;
|
||||||
|
int shard_6380 = 0;
|
||||||
|
int shard_6381 = 0;
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer
|
||||||
|
.toString(i));
|
||||||
|
switch (jedisShardInfo.getPort()) {
|
||||||
|
case 6379:
|
||||||
|
shard_6379++;
|
||||||
|
break;
|
||||||
|
case 6380:
|
||||||
|
shard_6380++;
|
||||||
|
break;
|
||||||
|
case 6381:
|
||||||
|
shard_6381++;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
fail("Attempting to use a non-defined shard!!:"
|
||||||
|
+ jedisShardInfo);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(shard_6379 > 300 && shard_6379 < 400);
|
||||||
|
assertTrue(shard_6380 > 300 && shard_6380 < 400);
|
||||||
|
assertTrue(shard_6381 > 300 && shard_6381 < 400);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -2,49 +2,38 @@ package redis.clients.jedis.tests.benchmark;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import redis.clients.jedis.Jedis;
|
import redis.clients.util.Hashing;
|
||||||
import redis.clients.jedis.JedisShardInfo;
|
|
||||||
import redis.clients.jedis.ShardedJedis;
|
|
||||||
import redis.clients.jedis.tests.HostAndPortUtil;
|
|
||||||
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
|
|
||||||
|
|
||||||
public class ShardedBenchmark {
|
public class ShardedBenchmark {
|
||||||
private static HostAndPort hnp1 = HostAndPortUtil.getRedisServers().get(0);
|
private static final int TOTAL_OPERATIONS = 10000000;
|
||||||
private static HostAndPort hnp2 = HostAndPortUtil.getRedisServers().get(1);
|
|
||||||
private static final int TOTAL_OPERATIONS = 100000;
|
|
||||||
|
|
||||||
public static void main(String[] args) throws UnknownHostException,
|
public static void main(String[] args) throws UnknownHostException,
|
||||||
IOException {
|
IOException {
|
||||||
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
|
|
||||||
JedisShardInfo shard = new JedisShardInfo(hnp1.host, hnp1.port);
|
|
||||||
shard.setPassword("foobared");
|
|
||||||
shards.add(shard);
|
|
||||||
shard = new JedisShardInfo(hnp2.host, hnp2.port);
|
|
||||||
shard.setPassword("foobared");
|
|
||||||
shards.add(shard);
|
|
||||||
ShardedJedis jedis = new ShardedJedis(shards);
|
|
||||||
Collection<Jedis> allShards = jedis.getAllShards();
|
|
||||||
for (Jedis j : allShards) {
|
|
||||||
j.flushAll();
|
|
||||||
}
|
|
||||||
|
|
||||||
long begin = Calendar.getInstance().getTimeInMillis();
|
long begin = Calendar.getInstance().getTimeInMillis();
|
||||||
|
|
||||||
for (int n = 0; n <= TOTAL_OPERATIONS; n++) {
|
for (int n = 0; n <= TOTAL_OPERATIONS; n++) {
|
||||||
String key = "foo" + n;
|
String key = "foo" + n;
|
||||||
jedis.set(key, "bar" + n);
|
Hashing.MD5.hash(key);
|
||||||
jedis.get(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long elapsed = Calendar.getInstance().getTimeInMillis() - begin;
|
long elapsed = Calendar.getInstance().getTimeInMillis() - begin;
|
||||||
|
|
||||||
jedis.disconnect();
|
System.out.println(((1000 * TOTAL_OPERATIONS) / elapsed) + " MD5 ops");
|
||||||
|
|
||||||
|
begin = Calendar.getInstance().getTimeInMillis();
|
||||||
|
|
||||||
|
for (int n = 0; n <= TOTAL_OPERATIONS; n++) {
|
||||||
|
String key = "foo" + n;
|
||||||
|
Hashing.MURMUR_HASH.hash(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
elapsed = Calendar.getInstance().getTimeInMillis() - begin;
|
||||||
|
|
||||||
|
System.out.println(((1000 * TOTAL_OPERATIONS) / elapsed)
|
||||||
|
+ " Murmur ops");
|
||||||
|
|
||||||
System.out.println(((1000 * 2 * TOTAL_OPERATIONS) / elapsed) + " ops");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user