diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 9f6617d..c0191ec 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -1,15 +1,15 @@ package redis.clients.jedis; -import java.io.IOException; +import redis.clients.util.RedisInputStream; +import redis.clients.util.RedisOutputStream; + +import java.io.*; import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; -import redis.clients.util.RedisInputStream; -import redis.clients.util.RedisOutputStream; - public class Connection { private String host; private int port = Protocol.DEFAULT_PORT; @@ -50,13 +50,13 @@ public class Connection { } protected Connection sendCommand(String name, String... args) { - try { - connect(); - } catch (UnknownHostException e) { - throw new JedisException("Could not connect to redis-server", e); - } catch (IOException e) { - throw new JedisException("Could not connect to redis-server", e); - } + try { + connect(); + } catch (UnknownHostException e) { + throw new JedisException("Could not connect to redis-server", e); + } catch (IOException e) { + throw new JedisException("Could not connect to redis-server", e); + } protocol.sendCommand(outputStream, name, args); pipelinedCommands++; return this; @@ -126,9 +126,9 @@ public class Connection { return (String) protocol.read(inputStream); } - public Integer getIntegerReply() { + public int getIntegerReply() { pipelinedCommands--; - return (Integer) protocol.read(inputStream); + return ((Integer) protocol.read(inputStream)).intValue(); } @SuppressWarnings("unchecked") diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 346704e..b5a8a39 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -29,7 +29,7 @@ public class Jedis { client.setTimeout(timeout); } - public Jedis(ShardInfo shardInfo) { + public Jedis(JedisShardInfo shardInfo) { client = new Client(shardInfo.getHost(), shardInfo.getPort()); client.setTimeout(shardInfo.getTimeout()); if (shardInfo.getPassword() != null) { diff --git a/src/main/java/redis/clients/jedis/JedisPool.java b/src/main/java/redis/clients/jedis/JedisPool.java index 3d19482..33ee94a 100644 --- a/src/main/java/redis/clients/jedis/JedisPool.java +++ b/src/main/java/redis/clients/jedis/JedisPool.java @@ -32,7 +32,7 @@ public class JedisPool extends FixedResourcePool { this.password = password; } - public JedisPool(ShardInfo shardInfo) { + public JedisPool(JedisShardInfo shardInfo) { this.host = shardInfo.getHost(); this.port = shardInfo.getPort(); this.timeout = shardInfo.getTimeout(); @@ -80,4 +80,4 @@ public class JedisPool extends FixedResourcePool { return false; } } -} \ No newline at end of file +} diff --git a/src/main/java/redis/clients/jedis/JedisShardInfo.java b/src/main/java/redis/clients/jedis/JedisShardInfo.java new file mode 100644 index 0000000..f3495ab --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisShardInfo.java @@ -0,0 +1,80 @@ +/* + * Copyright 2009-2010 MBTE Sweden AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package redis.clients.jedis; + +import redis.clients.util.ShardInfo; +import redis.clients.util.Sharded; + +public class JedisShardInfo extends ShardInfo { + @Override + public String toString() { + return "JedisShardInfo [host=" + host + ", port=" + port + ", weight=" + + getWeight() + "]"; + } + + private int timeout; + private String host; + private int port; + private String password = null; + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public JedisShardInfo(String host) { + this(host, Protocol.DEFAULT_PORT); + } + + public JedisShardInfo(String host, int port) { + this(host, port, 2000); + } + + public JedisShardInfo(String host, int port, int timeout) { + this(host, port, timeout, Sharded.DEFAULT_WEIGHT); + } + + public JedisShardInfo(String host, int port, int timeout, int weight) { + super(weight); + this.host = host; + this.port = port; + this.timeout = timeout; + } + + public String getPassword() { + return password; + } + + public void setPassword(String auth) { + this.password = auth; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + @Override + public Jedis createResource() { + return new Jedis(this); + } +} diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index c2274f6..6a9c1da 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -10,12 +10,12 @@ import redis.clients.util.Hashing; import redis.clients.util.ShardInfo; import redis.clients.util.Sharded; -public class ShardedJedis extends Sharded { - public ShardedJedis(List shards) { +public class ShardedJedis extends Sharded { + public ShardedJedis(List shards) { super(shards); } - public ShardedJedis(List shards, Hashing algo) { + public ShardedJedis(List shards, Hashing algo) { super(shards, algo); } @@ -347,12 +347,12 @@ public class ShardedJedis extends Sharded { } public void disconnect() throws IOException { - for (Jedis jedis : getAllShards()) { - jedis.disconnect(); + for (JedisShardInfo jedis : getAllShards()) { + jedis.getResource().disconnect(); } } - protected Jedis create(ShardInfo shard) { + protected Jedis create(JedisShardInfo shard) { return new Jedis(shard); } diff --git a/src/main/java/redis/clients/util/ShardInfo.java b/src/main/java/redis/clients/util/ShardInfo.java index 3a14ea1..25c050c 100644 --- a/src/main/java/redis/clients/util/ShardInfo.java +++ b/src/main/java/redis/clients/util/ShardInfo.java @@ -1,98 +1,28 @@ package redis.clients.util; -import redis.clients.jedis.Protocol; +public abstract class ShardInfo { + private T resource; -public class ShardInfo { - @Override - public String toString() { - return "ShardInfo [host=" + host + ", port=" + port + ", weight=" - + weight + "]"; - } - - private String host; - private int port; - private int timeout; private int weight; - private String password = null; - public String getHost() { - return host; + public ShardInfo() { } - public int getPort() { - return port; - } - - public int getTimeout() { - return timeout; - } - - public ShardInfo(String host) { - this(host, Protocol.DEFAULT_PORT); - } - - public ShardInfo(String host, int port) { - this(host, port, 2000); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((host == null) ? 0 : host.hashCode()); - result = prime * result + port; - result = prime * result + timeout; - result = prime * result + weight; - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - ShardInfo other = (ShardInfo) obj; - if (host == null) { - if (other.host != null) - return false; - } else if (!host.equals(other.host)) - return false; - if (port != other.port) - return false; - if (timeout != other.timeout) - return false; - if (weight != other.weight) - return false; - return true; - } - - public ShardInfo(String host, int port, int timeout) { - this(host, port, timeout, Sharded.DEFAULT_WEIGHT); - } - - public ShardInfo(String host, int port, int timeout, int weight) { - this.host = host; - this.port = port; - this.timeout = timeout; - this.weight = weight; - } - - public String getPassword() { - return password; - } - - public void setPassword(String auth) { - this.password = auth; - } - - public void setTimeout(int timeout) { - this.timeout = timeout; + public ShardInfo(int weight) { + this.weight = weight; } public int getWeight() { - return this.weight; + return this.weight; } + + public T getResource() { + return resource; + } + + public void initResource () { + resource = createResource(); + } + + protected abstract T createResource(); } diff --git a/src/main/java/redis/clients/util/Sharded.java b/src/main/java/redis/clients/util/Sharded.java index d075434..1f56a57 100644 --- a/src/main/java/redis/clients/util/Sharded.java +++ b/src/main/java/redis/clients/util/Sharded.java @@ -1,92 +1,53 @@ package redis.clients.util; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.Collection; -import java.util.HashMap; +import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.TreeMap; -public abstract class Sharded { +public class Sharded> { public static final int DEFAULT_WEIGHT = 1; - private TreeMap nodes; - private int totalWeight; - private Map resources; - private Hashing algo = Hashing.MD5; + private TreeMap nodes; + private final Hashing algo; - public Sharded(List shards) { - initialize(shards); + public Sharded(List shards) { + this(shards, Hashing.MURMUR_HASH); // MD5 is really not good as we works with 64-bits not 128 } - public Sharded(List shards, Hashing algo) { - initialize(shards); + public Sharded(List shards, Hashing algo) { + this.algo = algo; + initialize(shards); } - private void initialize(List shards) { - nodes = new TreeMap(); - resources = new HashMap(); + private void initialize(List shards) { + nodes = new TreeMap(); - totalWeight = 0; + int totalWeight = 0; - for (ShardInfo shard : shards) { - totalWeight += shard.getWeight(); - } + for (ShardInfo shard : shards) { + totalWeight += shard.getWeight(); + } - MessageDigest md5; - try { - md5 = MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { - throw new IllegalStateException("++++ no md5 algorythm found"); - } + long oneForthOfStep = (1L << 62) / totalWeight; // 62 vs 64 to normalize math in Long - for (ShardInfo shard : shards) { - double factor = Math - .floor(((double) (40 * shards.size() * DEFAULT_WEIGHT)) - / (double) totalWeight); - - for (long j = 0; j < factor; j++) { - byte[] d = md5.digest((shard.toString() + "-" + j).getBytes()); - for (int h = 0; h < 4; h++) { - Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) - | ((long) (d[2 + h * 4] & 0xFF) << 16) - | ((long) (d[1 + h * 4] & 0xFF) << 8) - | ((long) (d[0 + h * 4] & 0xFF)); - nodes.put(k, shard); - } - } - resources.put(shard, create(shard)); - } + long floor = Long.MIN_VALUE; + for (int i = 0; i != shards.size(); ++i) { + final S shardInfo = shards.get(i); + shardInfo.initResource(); + nodes.put(floor, shardInfo); + floor += 4 * oneForthOfStep * shardInfo.getWeight(); // *4 to compensate 62 vs 64 + } } - public ShardInfo getShardInfo(String key) { - long hv = calculateHash(key); - - return nodes.get(findPointFor(hv)); + public R getShard(String key) { + return nodes.floorEntry(algo.hash(key)).getValue().getResource(); } - private Long calculateHash(String key) { - return algo.hash(key); + public S getShardInfo(String key) { + return nodes.floorEntry(algo.hash(key)).getValue(); } - private Long findPointFor(Long hashK) { - Long k = nodes.ceilingKey(hashK); - - if (k == null) { - k = nodes.firstKey(); - } - - return k; - } - - public T getShard(String key) { - ShardInfo shard = getShardInfo(key); - return resources.get(shard); - } - - protected abstract T create(ShardInfo shard); - - public Collection getAllShards() { - return resources.values(); + public Collection getAllShards() { + return Collections.unmodifiableCollection(nodes.values()); } } \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/JedisTest.java b/src/test/java/redis/clients/jedis/tests/JedisTest.java index 64ff957..bd07820 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisTest.java @@ -6,6 +6,7 @@ import java.util.Map; import org.junit.Test; import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisShardInfo; import redis.clients.jedis.Protocol; import redis.clients.jedis.tests.commands.JedisCommandTestBase; import redis.clients.util.RedisOutputStream; @@ -35,7 +36,7 @@ public class JedisTest extends JedisCommandTestBase { @Test public void connectWithShardInfo() { - ShardInfo shardInfo = new ShardInfo("localhost", Protocol.DEFAULT_PORT); + JedisShardInfo shardInfo = new JedisShardInfo("localhost", Protocol.DEFAULT_PORT); shardInfo.setPassword("foobared"); Jedis jedis = new Jedis(shardInfo); jedis.get("foo"); diff --git a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java index fe4a8a4..f70b2f2 100644 --- a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java +++ b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java @@ -8,6 +8,7 @@ import org.junit.Assert; import org.junit.Test; import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisShardInfo; import redis.clients.jedis.ShardedJedis; import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort; import redis.clients.util.Hashing; @@ -19,9 +20,9 @@ public class ShardedJedisTest extends Assert { @Test public void checkSharding() throws IOException { - List shards = new ArrayList(); - shards.add(new ShardInfo(redis1.host, redis1.port)); - shards.add(new ShardInfo(redis2.host, redis2.port)); + List shards = new ArrayList(); + shards.add(new JedisShardInfo(redis1.host, redis1.port)); + shards.add(new JedisShardInfo(redis2.host, redis2.port)); ShardedJedis jedis = new ShardedJedis(shards); ShardInfo s1 = jedis.getShardInfo("a"); ShardInfo s2 = jedis.getShardInfo("b"); @@ -30,18 +31,18 @@ public class ShardedJedisTest extends Assert { @Test public void trySharding() throws IOException { - List shards = new ArrayList(); - ShardInfo si = new ShardInfo(redis1.host, redis1.port); + List shards = new ArrayList(); + JedisShardInfo si = new JedisShardInfo(redis1.host, redis1.port); si.setPassword("foobared"); shards.add(si); - si = new ShardInfo(redis2.host, redis2.port); + si = new JedisShardInfo(redis2.host, redis2.port); si.setPassword("foobared"); shards.add(si); ShardedJedis jedis = new ShardedJedis(shards); jedis.set("a", "bar"); - ShardInfo s1 = jedis.getShardInfo("a"); + JedisShardInfo s1 = jedis.getShardInfo("a"); jedis.set("b", "bar1"); - ShardInfo s2 = jedis.getShardInfo("b"); + JedisShardInfo s2 = jedis.getShardInfo("b"); jedis.disconnect(); Jedis j = new Jedis(s1.getHost(), s1.getPort()); @@ -57,18 +58,18 @@ public class ShardedJedisTest extends Assert { @Test public void tryShardingWithMurmure() throws IOException { - List shards = new ArrayList(); - ShardInfo si = new ShardInfo(redis1.host, redis1.port); + List shards = new ArrayList(); + JedisShardInfo si = new JedisShardInfo(redis1.host, redis1.port); si.setPassword("foobared"); shards.add(si); - si = new ShardInfo(redis2.host, redis2.port); + si = new JedisShardInfo(redis2.host, redis2.port); si.setPassword("foobared"); shards.add(si); ShardedJedis jedis = new ShardedJedis(shards, Hashing.MURMUR_HASH); jedis.set("a", "bar"); - ShardInfo s1 = jedis.getShardInfo("a"); + JedisShardInfo s1 = jedis.getShardInfo("a"); jedis.set("b", "bar1"); - ShardInfo s2 = jedis.getShardInfo("b"); + JedisShardInfo s2 = jedis.getShardInfo("b"); jedis.disconnect(); Jedis j = new Jedis(s1.getHost(), s1.getPort());