Added MurmureHash as sharding algo.
This commit is contained in:
@@ -5,6 +5,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import redis.clients.util.Hashing;
|
||||||
import redis.clients.util.ShardInfo;
|
import redis.clients.util.ShardInfo;
|
||||||
import redis.clients.util.Sharded;
|
import redis.clients.util.Sharded;
|
||||||
|
|
||||||
@@ -13,6 +14,10 @@ public class ShardedJedis extends Sharded<Jedis> {
|
|||||||
super(shards);
|
super(shards);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ShardedJedis(List<ShardInfo> shards, Hashing algo) {
|
||||||
|
super(shards, algo);
|
||||||
|
}
|
||||||
|
|
||||||
public String set(String key, String value) {
|
public String set(String key, String value) {
|
||||||
Jedis j = getShard(key);
|
Jedis j = getShard(key);
|
||||||
return j.set(key, value);
|
return j.set(key, value);
|
||||||
|
|||||||
79
src/main/java/redis/clients/util/Hashing.java
Normal file
79
src/main/java/redis/clients/util/Hashing.java
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
package redis.clients.util;
|
||||||
|
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
|
||||||
|
public abstract class Hashing {
|
||||||
|
public static final Hashing MURMURE_HASH = new Hashing() {
|
||||||
|
public long hash(String key) {
|
||||||
|
// 'm' and 'r' are mixing constants generated offline.
|
||||||
|
// They're not really 'magic', they just happen to work well.
|
||||||
|
byte[] data = key.getBytes();
|
||||||
|
int seed = 0x1234ABCD;
|
||||||
|
int m = 0x5bd1e995;
|
||||||
|
int r = 24;
|
||||||
|
|
||||||
|
// Initialize the hash to a 'random' value
|
||||||
|
int len = data.length;
|
||||||
|
int h = seed ^ len;
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
while (len >= 4) {
|
||||||
|
int k = data[i + 0] & 0xFF;
|
||||||
|
k |= (data[i + 1] & 0xFF) << 8;
|
||||||
|
k |= (data[i + 2] & 0xFF) << 16;
|
||||||
|
k |= (data[i + 3] & 0xFF) << 24;
|
||||||
|
|
||||||
|
k *= m;
|
||||||
|
k ^= k >>> r;
|
||||||
|
k *= m;
|
||||||
|
|
||||||
|
h *= m;
|
||||||
|
h ^= k;
|
||||||
|
|
||||||
|
i += 4;
|
||||||
|
len -= 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (len) {
|
||||||
|
case 3:
|
||||||
|
h ^= (data[i + 2] & 0xFF) << 16;
|
||||||
|
case 2:
|
||||||
|
h ^= (data[i + 1] & 0xFF) << 8;
|
||||||
|
case 1:
|
||||||
|
h ^= (data[i + 0] & 0xFF);
|
||||||
|
h *= m;
|
||||||
|
}
|
||||||
|
|
||||||
|
h ^= h >>> 13;
|
||||||
|
h *= m;
|
||||||
|
h ^= h >>> 15;
|
||||||
|
|
||||||
|
return h;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
public static final Hashing MD5 = new Hashing() {
|
||||||
|
private MessageDigest md5 = null; // avoid recurring construction
|
||||||
|
|
||||||
|
public long hash(String key) {
|
||||||
|
if (md5 == null) {
|
||||||
|
try {
|
||||||
|
md5 = MessageDigest.getInstance("MD5");
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"++++ no md5 algorythm found");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
md5.reset();
|
||||||
|
md5.update(key.getBytes());
|
||||||
|
byte[] bKey = md5.digest();
|
||||||
|
long res = ((long) (bKey[3] & 0xFF) << 24)
|
||||||
|
| ((long) (bKey[2] & 0xFF) << 16)
|
||||||
|
| ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public abstract long hash(String key);
|
||||||
|
}
|
||||||
@@ -10,15 +10,19 @@ import java.util.TreeMap;
|
|||||||
|
|
||||||
public abstract class Sharded<T> {
|
public abstract class Sharded<T> {
|
||||||
public static final int DEFAULT_WEIGHT = 1;
|
public static final int DEFAULT_WEIGHT = 1;
|
||||||
private static MessageDigest md5 = null; // avoid recurring construction
|
|
||||||
private TreeMap<Long, ShardInfo> nodes;
|
private TreeMap<Long, ShardInfo> nodes;
|
||||||
private int totalWeight;
|
private int totalWeight;
|
||||||
private Map<ShardInfo, T> resources;
|
private Map<ShardInfo, T> resources;
|
||||||
|
private Hashing algo = Hashing.MD5;
|
||||||
|
|
||||||
public Sharded(List<ShardInfo> shards) {
|
public Sharded(List<ShardInfo> shards) {
|
||||||
initialize(shards);
|
initialize(shards);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Sharded(List<ShardInfo> shards, Hashing algo) {
|
||||||
|
initialize(shards);
|
||||||
|
}
|
||||||
|
|
||||||
private void initialize(List<ShardInfo> shards) {
|
private void initialize(List<ShardInfo> shards) {
|
||||||
nodes = new TreeMap<Long, ShardInfo>();
|
nodes = new TreeMap<Long, ShardInfo>();
|
||||||
resources = new HashMap<ShardInfo, T>();
|
resources = new HashMap<ShardInfo, T>();
|
||||||
@@ -62,21 +66,7 @@ public abstract class Sharded<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Long calculateHash(String key) {
|
private Long calculateHash(String key) {
|
||||||
if (md5 == null) {
|
return algo.hash(key);
|
||||||
try {
|
|
||||||
md5 = MessageDigest.getInstance("MD5");
|
|
||||||
} catch (NoSuchAlgorithmException e) {
|
|
||||||
throw new IllegalStateException("++++ no md5 algorythm found");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
md5.reset();
|
|
||||||
md5.update(key.getBytes());
|
|
||||||
byte[] bKey = md5.digest();
|
|
||||||
long res = ((long) (bKey[3] & 0xFF) << 24)
|
|
||||||
| ((long) (bKey[2] & 0xFF) << 16)
|
|
||||||
| ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF);
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Long findPointFor(Long hashK) {
|
private Long findPointFor(Long hashK) {
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import org.junit.Test;
|
|||||||
import redis.clients.jedis.Jedis;
|
import redis.clients.jedis.Jedis;
|
||||||
import redis.clients.jedis.Protocol;
|
import redis.clients.jedis.Protocol;
|
||||||
import redis.clients.jedis.ShardedJedis;
|
import redis.clients.jedis.ShardedJedis;
|
||||||
|
import redis.clients.util.Hashing;
|
||||||
import redis.clients.util.ShardInfo;
|
import redis.clients.util.ShardInfo;
|
||||||
|
|
||||||
public class ShardedJedisTest extends Assert {
|
public class ShardedJedisTest extends Assert {
|
||||||
@@ -50,4 +51,32 @@ public class ShardedJedisTest extends Assert {
|
|||||||
assertEquals("bar1", j.get("b"));
|
assertEquals("bar1", j.get("b"));
|
||||||
j.disconnect();
|
j.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void tryShardingWithMurmure() throws IOException {
|
||||||
|
List<ShardInfo> shards = new ArrayList<ShardInfo>();
|
||||||
|
ShardInfo si = new ShardInfo("localhost", Protocol.DEFAULT_PORT);
|
||||||
|
si.setPassword("foobared");
|
||||||
|
shards.add(si);
|
||||||
|
si = new ShardInfo("localhost", Protocol.DEFAULT_PORT + 1);
|
||||||
|
si.setPassword("foobared");
|
||||||
|
shards.add(si);
|
||||||
|
ShardedJedis jedis = new ShardedJedis(shards, Hashing.MURMURE_HASH);
|
||||||
|
jedis.set("a", "bar");
|
||||||
|
ShardInfo s1 = jedis.getShardInfo("a");
|
||||||
|
jedis.set("b", "bar1");
|
||||||
|
ShardInfo s2 = jedis.getShardInfo("b");
|
||||||
|
jedis.disconnect();
|
||||||
|
|
||||||
|
Jedis j = new Jedis(s1.getHost(), s1.getPort());
|
||||||
|
j.auth("foobared");
|
||||||
|
assertEquals("bar", j.get("a"));
|
||||||
|
j.disconnect();
|
||||||
|
|
||||||
|
j = new Jedis(s2.getHost(), s2.getPort());
|
||||||
|
j.auth("foobared");
|
||||||
|
assertEquals("bar1", j.get("b"));
|
||||||
|
j.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user