diff --git a/Makefile b/Makefile index a6e9912..cf4751f 100644 --- a/Makefile +++ b/Makefile @@ -65,6 +65,7 @@ appendonly no slaveof localhost 6379 endef +# SENTINELS define REDIS_SENTINEL1 port 26379 daemonize yes @@ -101,6 +102,40 @@ pidfile /tmp/sentinel3.pid logfile /tmp/sentinel3.log endef +# CLUSTER REDIS NODES +define REDIS_CLUSTER_NODE1_CONF +daemonize yes +port 7379 +pidfile /tmp/redis_cluster_node1.pid +logfile /tmp/redis_cluster_node1.log +save "" +appendonly no +cluster-enabled yes +cluster-config-file /tmp/redis_cluster_node1.conf +endef + +define REDIS_CLUSTER_NODE2_CONF +daemonize yes +port 7380 +pidfile /tmp/redis_cluster_node2.pid +logfile /tmp/redis_cluster_node2.log +save "" +appendonly no +cluster-enabled yes +cluster-config-file /tmp/redis_cluster_node2.conf +endef + +define REDIS_CLUSTER_NODE3_CONF +daemonize yes +port 7381 +pidfile /tmp/redis_cluster_node3.pid +logfile /tmp/redis_cluster_node3.log +save "" +appendonly no +cluster-enabled yes +cluster-config-file /tmp/redis_cluster_node3.conf +endef + export REDIS1_CONF export REDIS2_CONF export REDIS3_CONF @@ -110,8 +145,11 @@ export REDIS6_CONF export REDIS_SENTINEL1 export REDIS_SENTINEL2 export REDIS_SENTINEL3 +export REDIS_CLUSTER_NODE1_CONF +export REDIS_CLUSTER_NODE2_CONF +export REDIS_CLUSTER_NODE3_CONF -start: +start: cleanup echo "$$REDIS1_CONF" | redis-server - echo "$$REDIS2_CONF" | redis-server - echo "$$REDIS3_CONF" | redis-server - @@ -123,6 +161,12 @@ start: echo "$$REDIS_SENTINEL2" > /tmp/sentinel2.conf && redis-server /tmp/sentinel2.conf --sentinel @sleep 0.5 echo "$$REDIS_SENTINEL3" > /tmp/sentinel3.conf && redis-server /tmp/sentinel3.conf --sentinel + echo "$$REDIS_CLUSTER_NODE1_CONF" | redis-server - + echo "$$REDIS_CLUSTER_NODE2_CONF" | redis-server - + echo "$$REDIS_CLUSTER_NODE3_CONF" | redis-server - + +cleanup: + rm -vf /tmp/redis_cluster_node*.conf stop: kill `cat /tmp/redis1.pid` @@ -135,10 +179,16 @@ stop: kill `cat /tmp/sentinel1.pid` kill `cat /tmp/sentinel2.pid` kill `cat /tmp/sentinel3.pid` + kill `cat /tmp/redis_cluster_node1.pid` || true + kill `cat /tmp/redis_cluster_node2.pid` || true + kill `cat /tmp/redis_cluster_node3.pid` || true + rm -f /tmp/redis_cluster_node1.conf + rm -f /tmp/redis_cluster_node2.conf + rm -f /tmp/redis_cluster_node3.conf test: make start - mvn clean compile test + mvn -Dtest=${TEST} clean compile test make stop deploy: diff --git a/pom.xml b/pom.xml index 9409ec9..f6575eb 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,7 @@ localhost:6379,localhost:6380,localhost:6381,localhost:6382,localhost:6383,localhost:6384 localhost:26379,localhost:26380,localhost:26381 + localhost:7379,localhost:7380,localhost:7381 github diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index dff1e76..f1b167a 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1135,4 +1135,11 @@ public class BinaryClient extends Connection { public void waitReplicas(int replicas, long timeout) { sendCommand(WAIT, toByteArray(replicas), toByteArray(timeout)); } + + public void cluster(final byte[]... args) { + sendCommand(CLUSTER, args); + } + public void asking() { + sendCommand(Command.ASKING); + } } diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index 92a99a2..f08633f 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -1697,7 +1697,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKey protected void checkIsInMulti() { if (client.isInMulti()) { - throw new JedisDataException( + throw new JedisDataException( "Cannot use Jedis when in Multi. Please use JedisTransaction instead."); } } diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index c5a072b..1353a1b 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -820,4 +820,65 @@ public class Client extends BinaryClient implements Commands { public void zscan(final String key, int cursor, final ScanParams params) { zscan(SafeEncoder.encode(key), cursor, params); } + + public void cluster(final String subcommand, final int... args) { + final byte[][] arg = new byte[args.length+1][]; + for (int i = 1; i < arg.length; i++) { + arg[i] = toByteArray(args[i-1]); + } + arg[0] = SafeEncoder.encode(subcommand); + cluster(arg); + } + + public void cluster(final String subcommand, final String... args) { + final byte[][] arg = new byte[args.length+1][]; + for (int i = 1; i < arg.length; i++) { + arg[i] = SafeEncoder.encode(args[i-1]); + } + arg[0] = SafeEncoder.encode(subcommand); + cluster(arg); + } + + public void cluster(final String subcommand) { + final byte[][] arg = new byte[1][]; + arg[0] = SafeEncoder.encode(subcommand); + cluster(arg); + } + + public void clusterNodes() { + cluster(Protocol.CLUSTER_NODES); + } + + public void clusterMeet(final String ip, final int port) { + cluster(Protocol.CLUSTER_MEET, ip, String.valueOf(port)); + } + + public void clusterAddSlots(final int ...slots) { + cluster(Protocol.CLUSTER_ADDSLOTS, slots); + } + + public void clusterDelSlots(final int ...slots) { + cluster(Protocol.CLUSTER_DELSLOTS, slots); + } + + public void clusterInfo() { + cluster(Protocol.CLUSTER_INFO); + } + + public void clusterGetKeysInSlot(final int slot, final int count) { + final int[] args = new int[]{ slot, count }; + cluster(Protocol.CLUSTER_GETKEYSINSLOT, args); + } + + public void clusterSetSlotNode(final int slot, final String nodeId) { + cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot), Protocol.CLUSTER_SETSLOT_NODE, nodeId); + } + + public void clusterSetSlotMigrating(final int slot, final String nodeId) { + cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot), Protocol.CLUSTER_SETSLOT_MIGRATING, nodeId); + } + + public void clusterSetSlotImporting(final int slot, final String nodeId) { + cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot), Protocol.CLUSTER_SETSLOT_IMPORTING, nodeId); + } } diff --git a/src/main/java/redis/clients/jedis/ClusterCommands.java b/src/main/java/redis/clients/jedis/ClusterCommands.java new file mode 100644 index 0000000..fff4533 --- /dev/null +++ b/src/main/java/redis/clients/jedis/ClusterCommands.java @@ -0,0 +1,23 @@ +package redis.clients.jedis; + +import java.util.List; + +public interface ClusterCommands { + String clusterNodes(); + + String clusterMeet(final String ip, final int port); + + String clusterAddSlots(final int... slots); + + String clusterDelSlots(final int... slots); + + String clusterInfo(); + + List clusterGetKeysInSlot(final int slot, final int count); + + String clusterSetSlotNode(final int slot, final String nodeId); + + String clusterSetSlotMigrating(final int slot, final String nodeId); + + String clusterSetSlotImporting(final int slot, final String nodeId); +} diff --git a/src/main/java/redis/clients/jedis/ClusterPipeline.java b/src/main/java/redis/clients/jedis/ClusterPipeline.java new file mode 100644 index 0000000..73330d4 --- /dev/null +++ b/src/main/java/redis/clients/jedis/ClusterPipeline.java @@ -0,0 +1,23 @@ +package redis.clients.jedis; + +import java.util.List; + +public interface ClusterPipeline { + Response clusterNodes(); + + Response clusterMeet(final String ip, final int port); + + Response clusterAddSlots(final int... slots); + + Response clusterDelSlots(final int... slots); + + Response clusterInfo(); + + Response> clusterGetKeysInSlot(final int slot, final int count); + + Response clusterSetSlotNode(final int slot, final String nodeId); + + Response clusterSetSlotMigrating(final int slot, final String nodeId); + + Response clusterSetSlotImporting(final int slot, final String nodeId); +} diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 4ef5c4f..f5f6bdd 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -13,8 +13,10 @@ import java.util.Set; import redis.clients.jedis.BinaryClient.LIST_POSITION; import redis.clients.util.SafeEncoder; import redis.clients.util.Slowlog; +import java.net.URI; +import java.util.*; -public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands { +public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands { public Jedis(final String host) { super(host); } @@ -3153,4 +3155,63 @@ public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommand } return new ScanResult(newcursor, results); } + public String clusterNodes() { + checkIsInMulti(); + client.clusterNodes(); + return client.getBulkReply(); + } + + public String clusterMeet(final String ip, final int port) { + checkIsInMulti(); + client.clusterMeet(ip, port); + return client.getStatusCodeReply(); + } + + public String clusterAddSlots(final int ...slots) { + checkIsInMulti(); + client.clusterAddSlots(slots); + return client.getStatusCodeReply(); + } + + public String clusterDelSlots(final int ...slots) { + checkIsInMulti(); + client.clusterDelSlots(slots); + return client.getStatusCodeReply(); + } + + public String clusterInfo() { + checkIsInMulti(); + client.clusterInfo(); + return client.getStatusCodeReply(); + } + + public List clusterGetKeysInSlot(final int slot, final int count) { + checkIsInMulti(); + client.clusterGetKeysInSlot(slot, count); + return client.getMultiBulkReply(); + } + + public String clusterSetSlotNode(final int slot, final String nodeId) { + checkIsInMulti(); + client.clusterSetSlotNode(slot, nodeId); + return client.getStatusCodeReply(); + } + + public String clusterSetSlotMigrating(final int slot, final String nodeId) { + checkIsInMulti(); + client.clusterSetSlotMigrating(slot, nodeId); + return client.getStatusCodeReply(); + } + + public String clusterSetSlotImporting(final int slot, final String nodeId) { + checkIsInMulti(); + client.clusterSetSlotImporting(slot, nodeId); + return client.getStatusCodeReply(); + } + + public String asking() { + checkIsInMulti(); + client.asking(); + return client.getStatusCodeReply(); + } } diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java new file mode 100644 index 0000000..ed60b5a --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -0,0 +1,1432 @@ +package redis.clients.jedis; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import redis.clients.jedis.BinaryClient.LIST_POSITION; + +public class JedisCluster implements JedisCommands, BasicCommands { + public static final short HASHSLOTS = 16384; + private static final int DEFAULT_TIMEOUT = 1; + private static final int DEFAULT_MAX_REDIRECTIONS = 5; + + private int timeout; + private int maxRedirections; + + private JedisClusterConnectionHandler connectionHandler; + + public JedisCluster(Set nodes, int timeout) { + this(nodes, timeout, DEFAULT_MAX_REDIRECTIONS); + } + + public JedisCluster(Set nodes) { + this(nodes, DEFAULT_TIMEOUT); + } + + public JedisCluster(Set jedisClusterNode, int timeout, + int maxRedirections) { + this.connectionHandler = new JedisSlotBasedConnectionHandler( + jedisClusterNode); + this.timeout = timeout; + this.maxRedirections = maxRedirections; + } + + @Override + public String set(final String key, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().set(key, value); + } + }.run(key); + } + + @Override + public String get(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().get(key); + } + }.run(key); + } + + @Override + public Boolean exists(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Boolean execute() { + return connectionHandler.getConnection().exists(key); + } + }.run(key); + } + + @Override + public Long persist(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().persist(key); + } + }.run(key); + } + + @Override + public String type(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().type(key); + } + }.run(key); + } + + @Override + public Long expire(final String key, final int seconds) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().expire(key, seconds); + } + }.run(key); + } + + @Override + public Long expireAt(final String key, final long unixTime) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection() + .expireAt(key, unixTime); + } + }.run(key); + } + + @Override + public Long ttl(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().ttl(key); + } + }.run(key); + } + + @Override + public Boolean setbit(final String key, final long offset, + final boolean value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Boolean execute() { + return connectionHandler.getConnection().setbit(key, offset, + value); + } + }.run(key); + } + + @Override + public Boolean setbit(final String key, final long offset, + final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Boolean execute() { + return connectionHandler.getConnection().setbit(key, offset, + value); + } + }.run(key); + } + + @Override + public Boolean getbit(final String key, final long offset) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Boolean execute() { + return connectionHandler.getConnection().getbit(key, offset); + } + }.run(key); + } + + @Override + public Long setrange(final String key, final long offset, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().setrange(key, offset, + value); + } + }.run(key); + } + + @Override + public String getrange(final String key, final long startOffset, + final long endOffset) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().getrange(key, + startOffset, endOffset); + } + }.run(key); + } + + @Override + public String getSet(final String key, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().getSet(key, value); + } + }.run(key); + } + + @Override + public Long setnx(final String key, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().setnx(key, value); + } + }.run(key); + } + + @Override + public String setex(final String key, final int seconds, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().setex(key, seconds, + value); + } + }.run(key); + } + + @Override + public Long decrBy(final String key, final long integer) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().decrBy(key, integer); + } + }.run(key); + } + + @Override + public Long decr(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().decr(key); + } + }.run(key); + } + + @Override + public Long incrBy(final String key, final long integer) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().incrBy(key, integer); + } + }.run(key); + } + + @Override + public Long incr(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().incr(key); + } + }.run(key); + } + + @Override + public Long append(final String key, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().append(key, value); + } + }.run(key); + } + + @Override + public String substr(final String key, final int start, final int end) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection() + .substr(key, start, end); + } + }.run(key); + } + + @Override + public Long hset(final String key, final String field, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection() + .hset(key, field, value); + } + }.run(key); + } + + @Override + public String hget(final String key, final String field) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().hget(key, field); + } + }.run(key); + } + + @Override + public Long hsetnx(final String key, final String field, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().hsetnx(key, field, + value); + } + }.run(key); + } + + @Override + public String hmset(final String key, final Map hash) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().hmset(key, hash); + } + }.run(key); + } + + @Override + public List hmget(final String key, final String... fields) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public List execute() { + return connectionHandler.getConnection().hmget(key, fields); + } + }.run(key); + } + + @Override + public Long hincrBy(final String key, final String field, final long value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().hincrBy(key, field, + value); + } + }.run(key); + } + + @Override + public Boolean hexists(final String key, final String field) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Boolean execute() { + return connectionHandler.getConnection().hexists(key, field); + } + }.run(key); + } + + @Override + public Long hdel(final String key, final String... field) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().hdel(key, field); + } + }.run(key); + } + + @Override + public Long hlen(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().hdel(key); + } + }.run(key); + } + + @Override + public Set hkeys(final String key) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().hkeys(key); + } + }.run(key); + } + + @Override + public List hvals(final String key) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public List execute() { + return connectionHandler.getConnection().hvals(key); + } + }.run(key); + } + + @Override + public Map hgetAll(final String key) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public Map execute() { + return connectionHandler.getConnection().hgetAll(key); + } + }.run(key); + } + + @Override + public Long rpush(final String key, final String... string) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().rpush(key, string); + } + }.run(key); + } + + @Override + public Long lpush(final String key, final String... string) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().lpush(key, string); + } + }.run(key); + } + + @Override + public Long llen(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().llen(key); + } + }.run(key); + } + + @Override + public List lrange(final String key, final long start, + final long end) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public List execute() { + return connectionHandler.getConnection() + .lrange(key, start, end); + } + }.run(key); + } + + @Override + public String ltrim(final String key, final long start, final long end) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().ltrim(key, start, end); + } + }.run(key); + } + + @Override + public String lindex(final String key, final long index) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().lindex(key, index); + } + }.run(key); + } + + @Override + public String lset(final String key, final long index, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection() + .lset(key, index, value); + } + }.run(key); + } + + @Override + public Long lrem(final String key, final long count, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection() + .lrem(key, count, value); + } + }.run(key); + } + + @Override + public String lpop(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().lpop(key); + } + }.run(key); + } + + @Override + public String rpop(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().rpop(key); + } + }.run(key); + } + + @Override + public Long sadd(final String key, final String... member) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().sadd(key, member); + } + }.run(key); + } + + @Override + public Set smembers(final String key) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().smembers(key); + } + }.run(key); + } + + @Override + public Long srem(final String key, final String... member) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().srem(key, member); + } + }.run(key); + } + + @Override + public String spop(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().spop(key); + } + }.run(key); + } + + @Override + public Long scard(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().scard(key); + } + }.run(key); + } + + @Override + public Boolean sismember(final String key, final String member) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Boolean execute() { + return connectionHandler.getConnection().sismember(key, member); + } + }.run(key); + } + + @Override + public String srandmember(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().srandmember(key); + } + }.run(key); + } + + @Override + public Long strlen(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().strlen(key); + } + }.run(key); + } + + @Override + public Long zadd(final String key, final double score, final String member) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zadd(key, score, + member); + } + }.run(key); + } + + @Override + public Long zadd(final String key, final Map scoreMembers) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection() + .zadd(key, scoreMembers); + } + }.run(key); + } + + @Override + public Set zrange(final String key, final long start, final long end) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection() + .zrange(key, start, end); + } + }.run(key); + } + + @Override + public Long zrem(final String key, final String... member) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zrem(key, member); + } + }.run(key); + } + + @Override + public Double zincrby(final String key, final double score, + final String member) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Double execute() { + return connectionHandler.getConnection().zincrby(key, score, + member); + } + }.run(key); + } + + @Override + public Long zrank(final String key, final String member) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zrank(key, member); + } + }.run(key); + } + + @Override + public Long zrevrank(final String key, final String member) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zrevrank(key, member); + } + }.run(key); + } + + @Override + public Set zrevrange(final String key, final long start, + final long end) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrevrange(key, start, + end); + } + }.run(key); + } + + @Override + public Set zrangeWithScores(final String key, final long start, + final long end) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrangeWithScores(key, + start, end); + } + }.run(key); + } + + @Override + public Set zrevrangeWithScores(final String key, final long start, + final long end) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrevrangeWithScores( + key, start, end); + } + }.run(key); + } + + @Override + public Long zcard(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zcard(key); + } + }.run(key); + } + + @Override + public Double zscore(final String key, final String member) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Double execute() { + return connectionHandler.getConnection().zscore(key, member); + } + }.run(key); + } + + @Override + public List sort(final String key) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public List execute() { + return connectionHandler.getConnection().sort(key); + } + }.run(key); + } + + @Override + public List sort(final String key, + final SortingParams sortingParameters) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public List execute() { + return connectionHandler.getConnection().sort(key, + sortingParameters); + } + }.run(key); + } + + @Override + public Long zcount(final String key, final double min, final double max) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zcount(key, min, max); + } + }.run(key); + } + + @Override + public Long zcount(final String key, final String min, final String max) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zcount(key, min, max); + } + }.run(key); + } + + @Override + public Set zrangeByScore(final String key, final double min, + final double max) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrangeByScore(key, + min, max); + } + }.run(key); + } + + @Override + public Set zrangeByScore(final String key, final String min, + final String max) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrangeByScore(key, + min, max); + } + }.run(key); + } + + @Override + public Set zrevrangeByScore(final String key, final double max, + final double min) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrevrangeByScore(key, + min, max); + } + }.run(key); + } + + @Override + public Set zrangeByScore(final String key, final double min, + final double max, final int offset, final int count) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrangeByScore(key, + min, max, offset, count); + } + }.run(key); + } + + @Override + public Set zrevrangeByScore(final String key, final String max, + final String min) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrevrangeByScore(key, + min, max); + } + }.run(key); + } + + @Override + public Set zrangeByScore(final String key, final String min, + final String max, final int offset, final int count) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrangeByScore(key, + min, max, offset, count); + } + }.run(key); + } + + @Override + public Set zrevrangeByScore(final String key, final double max, + final double min, final int offset, final int count) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrevrangeByScore(key, + min, max, offset, count); + } + }.run(key); + } + + @Override + public Set zrangeByScoreWithScores(final String key, + final double min, final double max) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection() + .zrangeByScoreWithScores(key, min, max); + } + }.run(key); + } + + @Override + public Set zrevrangeByScoreWithScores(final String key, + final double max, final double min) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection() + .zrevrangeByScoreWithScores(key, min, max); + } + }.run(key); + } + + @Override + public Set zrangeByScoreWithScores(final String key, + final double min, final double max, final int offset, + final int count) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection() + .zrangeByScoreWithScores(key, min, max, offset, count); + } + }.run(key); + } + + @Override + public Set zrevrangeByScore(final String key, final String max, + final String min, final int offset, final int count) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection().zrevrangeByScore(key, + min, max, offset, count); + } + }.run(key); + } + + @Override + public Set zrangeByScoreWithScores(final String key, + final String min, final String max) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection() + .zrangeByScoreWithScores(key, min, max); + } + }.run(key); + } + + @Override + public Set zrevrangeByScoreWithScores(final String key, + final String max, final String min) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection() + .zrevrangeByScoreWithScores(key, min, max); + } + }.run(key); + } + + @Override + public Set zrangeByScoreWithScores(final String key, + final String min, final String max, final int offset, + final int count) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection() + .zrangeByScoreWithScores(key, min, max, offset, count); + } + }.run(key); + } + + @Override + public Set zrevrangeByScoreWithScores(final String key, + final double max, final double min, final int offset, + final int count) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection() + .zrevrangeByScoreWithScores(key, max, min, offset, + count); + } + }.run(key); + } + + @Override + public Set zrevrangeByScoreWithScores(final String key, + final String max, final String min, final int offset, + final int count) { + return new JedisClusterCommand>(connectionHandler, timeout, + maxRedirections) { + @Override + public Set execute() { + return connectionHandler.getConnection() + .zrevrangeByScoreWithScores(key, max, min, offset, + count); + } + }.run(key); + } + + @Override + public Long zremrangeByRank(final String key, final long start, + final long end) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zremrangeByRank(key, + start, end); + } + }.run(key); + } + + @Override + public Long zremrangeByScore(final String key, final double start, + final double end) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zremrangeByScore(key, + start, end); + } + }.run(key); + } + + @Override + public Long zremrangeByScore(final String key, final String start, + final String end) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().zremrangeByScore(key, + start, end); + } + }.run(key); + } + + @Override + public Long linsert(final String key, final LIST_POSITION where, + final String pivot, final String value) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().linsert(key, where, + pivot, value); + } + }.run(key); + } + + @Override + public Long lpushx(final String key, final String... string) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().lpushx(key, string); + } + }.run(key); + } + + @Override + public Long rpushx(final String key, final String... string) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().rpushx(key, string); + } + }.run(key); + } + + @Override + public List blpop(final String arg) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public List execute() { + return connectionHandler.getConnection().blpop(arg); + } + }.run(null); + } + + @Override + public List brpop(final String arg) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public List execute() { + return connectionHandler.getConnection().brpop(arg); + } + }.run(null); + } + + @Override + public Long del(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().del(key); + } + }.run(null); + } + + @Override + public String echo(final String string) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().echo(string); + } + }.run(null); + } + + @Override + public Long move(final String key, final int dbIndex) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().move(key, dbIndex); + } + }.run(key); + } + + @Override + public Long bitcount(final String key) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().bitcount(key); + } + }.run(key); + } + + @Override + public Long bitcount(final String key, final long start, final long end) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().bitcount(key, start, + end); + } + }.run(key); + } + + @Override + public String ping() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().ping(); + } + }.run(null); + } + + @Override + public String quit() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().quit(); + } + }.run(null); + } + + @Override + public String flushDB() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().flushDB(); + } + }.run(null); + } + + @Override + public Long dbSize() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().dbSize(); + } + }.run(null); + } + + @Override + public String select(final int index) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().select(index); + } + }.run(null); + } + + @Override + public String flushAll() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().flushAll(); + } + }.run(null); + } + + @Override + public String auth(final String password) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().auth(password); + } + }.run(null); + } + + @Override + public String save() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().save(); + } + }.run(null); + } + + @Override + public String bgsave() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().bgsave(); + } + }.run(null); + } + + @Override + public String bgrewriteaof() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().bgrewriteaof(); + } + }.run(null); + } + + @Override + public Long lastsave() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().lastsave(); + } + }.run(null); + } + + @Override + public String shutdown() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().shutdown(); + } + }.run(null); + } + + @Override + public String info() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().info(); + } + }.run(null); + } + + @Override + public String info(final String section) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().info(section); + } + }.run(null); + } + + @Override + public String slaveof(final String host, final int port) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().slaveof(host, port); + } + }.run(null); + } + + @Override + public String slaveofNoOne() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().slaveofNoOne(); + } + }.run(null); + } + + @Override + public Long getDB() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public Long execute() { + return connectionHandler.getConnection().getDB(); + } + }.run(null); + } + + @Override + public String debug(final DebugParams params) { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().debug(params); + } + }.run(null); + } + + @Override + public String configResetStat() { + return new JedisClusterCommand(connectionHandler, timeout, + maxRedirections) { + @Override + public String execute() { + return connectionHandler.getConnection().configResetStat(); + } + }.run(null); + } + + public Map getClusterNodes() { + return connectionHandler.getNodes(); + } + + @Override + public Long waitReplicas(int replicas, long timeout) { + // TODO Auto-generated method stub + return null; + } + + @Override + public ScanResult> hscan(final String key, + final int cursor) { + return new JedisClusterCommand>>( + connectionHandler, timeout, maxRedirections) { + @Override + public ScanResult> execute() { + return connectionHandler.getConnection().hscan(key, cursor); + } + }.run(null); + } + + @Override + public ScanResult sscan(final String key, final int cursor) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public ScanResult execute() { + return connectionHandler.getConnection().sscan(key, cursor); + } + }.run(null); + } + + @Override + public ScanResult zscan(final String key, final int cursor) { + return new JedisClusterCommand>(connectionHandler, + timeout, maxRedirections) { + @Override + public ScanResult execute() { + return connectionHandler.getConnection().zscan(key, cursor); + } + }.run(null); + } +} diff --git a/src/main/java/redis/clients/jedis/JedisClusterCommand.java b/src/main/java/redis/clients/jedis/JedisClusterCommand.java new file mode 100644 index 0000000..8596971 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisClusterCommand.java @@ -0,0 +1,53 @@ +package redis.clients.jedis; + +import redis.clients.jedis.exceptions.JedisAskDataException; +import redis.clients.jedis.exceptions.JedisClusterException; +import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException; +import redis.clients.jedis.exceptions.JedisRedirectionException; +import redis.clients.util.JedisClusterCRC16; + +public abstract class JedisClusterCommand { + + private boolean asking = false; + + private JedisClusterConnectionHandler connectionHandler; + private int commandTimeout; + private int redirections; +// private boolean asking = false; + + public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int timeout, int maxRedirections) { + this.connectionHandler = connectionHandler; + this.commandTimeout = timeout; + this.redirections = maxRedirections; + } + + public abstract T execute(); + + public T run(String key) { + try { + + if (key == null) { + throw new JedisClusterException("No way to dispatch this command to Redis Cluster."); + } else if (redirections == 0) { + throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); + } + connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); + if (asking) { + //TODO: Pipeline asking with the original command to make it faster.... + connectionHandler.getConnection().asking(); + } + return execute(); + } catch (JedisRedirectionException jre) { + return handleRedirection(jre, key); + } + } + + private T handleRedirection(JedisRedirectionException jre, String key) { + if (jre instanceof JedisAskDataException) { + asking = true; + } + redirections--; + this.connectionHandler.assignSlotToNode(jre.getSlot(), jre.getTargetNode()); + return run(key); + } +} \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java new file mode 100644 index 0000000..765b9a2 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -0,0 +1,82 @@ +package redis.clients.jedis; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Set; + + +public abstract class JedisClusterConnectionHandler { + + protected Map nodes = new HashMap(); + protected Map slots = new HashMap(); + + abstract Jedis getConnection(); + abstract Jedis getConnectionFromSlot(int slot); + + public JedisClusterConnectionHandler(Set nodes) { + initializeSlotsCache(nodes); + } + + public Map getNodes() { + return nodes; + } + + private void initializeSlotsCache(Set nodes) { + for (HostAndPort hostAndPort : nodes) { + JedisPool jp = new JedisPool(hostAndPort.getHost(), hostAndPort.getPort()); + this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp); + discoverClusterNodesAndSlots(jp); + } + + } + + private void discoverClusterNodesAndSlots(JedisPool jp) { + String localNodes = jp.getResource().clusterNodes(); + for (String nodeInfo : localNodes.split("\n")) { + HostAndPort node = getHostAndPortFromNodeLine(nodeInfo); + JedisPool nodePool = new JedisPool(node.getHost(), node.getPort()); + this.nodes.put(node.getHost() + node.getPort(), nodePool); + populateNodeSlots(nodeInfo, nodePool); + } + } + + private void populateNodeSlots(String nodeInfo, JedisPool nodePool) { + String[] nodeInfoArray = nodeInfo.split(" "); + if (nodeInfoArray.length > 7) { + for (int i = 8; i < nodeInfoArray.length; i++) { + processSlot(nodeInfoArray[i], nodePool); + } + } + } + + private void processSlot(String slot, JedisPool nodePool) { + if (slot.contains("-")) { + String[] slotRange = slot.split("-"); + for (int i = Integer.valueOf(slotRange[0]); i <= Integer.valueOf(slotRange[1]); i++) { + slots.put(i, nodePool); + } + } else { + slots.put(Integer.valueOf(slot), nodePool); + } + } + + private HostAndPort getHostAndPortFromNodeLine(String nodeInfo) { + String stringHostAndPort = nodeInfo.split(" ",3)[1]; + String[] arrayHostAndPort = stringHostAndPort.split(":"); + return new HostAndPort(arrayHostAndPort[0], Integer.valueOf(arrayHostAndPort[1])); + } + + public void assignSlotToNode(int slot, HostAndPort targetNode) { + JedisPool targetPool = nodes.get(targetNode.getHost() + targetNode.getPort()); + slots.put(slot, targetPool); + } + + + protected JedisPool getRandomConnection() { + Object[] nodeArray = nodes.values().toArray(); + return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]); + } + + +} diff --git a/src/main/java/redis/clients/jedis/JedisPool.java b/src/main/java/redis/clients/jedis/JedisPool.java index 58212ba..6511207 100644 --- a/src/main/java/redis/clients/jedis/JedisPool.java +++ b/src/main/java/redis/clients/jedis/JedisPool.java @@ -18,7 +18,7 @@ public class JedisPool extends Pool { this(new GenericObjectPoolConfig(), host, port, Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null); } - + public JedisPool(final String host) { URI uri = URI.create(host); if (uri.getScheme() != null && uri.getScheme().equals("redis")) { diff --git a/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java new file mode 100644 index 0000000..40d3f04 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java @@ -0,0 +1,20 @@ +package redis.clients.jedis; + +import java.util.Set; + +public class JedisRandomConnectionHandler extends JedisClusterConnectionHandler { + + + public JedisRandomConnectionHandler(Set nodes) { + super(nodes); + } + + public Jedis getConnection() { + return getRandomConnection().getResource(); + } + + @Override + Jedis getConnectionFromSlot(int slot) { + return getRandomConnection().getResource(); + } +} diff --git a/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java new file mode 100644 index 0000000..4f3ea5d --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java @@ -0,0 +1,48 @@ +package redis.clients.jedis; + +import java.util.Set; + +public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler { + + private Jedis currentConnection; + + public JedisSlotBasedConnectionHandler(Set nodes) { + super(nodes); + } + + + public Jedis getConnection() { + return currentConnection != null ? currentConnection : getRandomConnection().getResource(); + } + + + + + private void returnCurrentConnection() { + if (currentConnection != null) { + nodes.get(currentConnection.getClient().getHost()+currentConnection.getClient().getPort()).returnResource(currentConnection); + } + + } + + + @Override + public void assignSlotToNode(int slot, HostAndPort targetNode) { + super.assignSlotToNode(slot, targetNode); + getConnectionFromSlot(slot); + } + + @Override + public Jedis getConnectionFromSlot(int slot) { + returnCurrentConnection(); + JedisPool connectionPool = slots.get(slot); + if (connectionPool == null) { + connectionPool = getRandomConnection(); + } + currentConnection = connectionPool.getResource(); + return connectionPool.getResource(); + } + + + +} diff --git a/src/main/java/redis/clients/jedis/MultiKeyPipelineBase.java b/src/main/java/redis/clients/jedis/MultiKeyPipelineBase.java index be9d89a..04bdc80 100644 --- a/src/main/java/redis/clients/jedis/MultiKeyPipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiKeyPipelineBase.java @@ -7,7 +7,8 @@ import java.util.Set; abstract class MultiKeyPipelineBase extends PipelineBase implements BasicRedisPipeline, MultiKeyBinaryRedisPipeline, - MultiKeyCommandsPipeline { + MultiKeyCommandsPipeline, + ClusterPipeline { protected Client client = null; @@ -398,4 +399,49 @@ abstract class MultiKeyPipelineBase extends PipelineBase implements client.bitop(op, destKey, srcKeys); return getResponse(BuilderFactory.LONG); } + + public Response clusterNodes() { + client.clusterNodes(); + return getResponse(BuilderFactory.STRING); + } + + public Response clusterMeet(final String ip, final int port) { + client.clusterMeet(ip, port); + return getResponse(BuilderFactory.STRING); + } + + public Response clusterAddSlots(final int... slots) { + client.clusterAddSlots(slots); + return getResponse(BuilderFactory.STRING); + } + + public Response clusterDelSlots(final int... slots) { + client.clusterDelSlots(slots); + return getResponse(BuilderFactory.STRING); + } + + public Response clusterInfo() { + client.clusterInfo(); + return getResponse(BuilderFactory.STRING); + } + + public Response> clusterGetKeysInSlot(final int slot, final int count) { + client.clusterGetKeysInSlot(slot, count); + return getResponse(BuilderFactory.STRING_LIST); + } + + public Response clusterSetSlotNode(final int slot, final String nodeId) { + client.clusterSetSlotNode(slot, nodeId); + return getResponse(BuilderFactory.STRING); + } + + public Response clusterSetSlotMigrating(final int slot, final String nodeId) { + client.clusterSetSlotMigrating(slot, nodeId); + return getResponse(BuilderFactory.STRING); + } + + public Response clusterSetSlotImporting(final int slot, final String nodeId) { + client.clusterSetSlotImporting(slot, nodeId); + return getResponse(BuilderFactory.STRING); + } } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 354d158..fb9eb8c 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -4,15 +4,19 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import redis.clients.jedis.exceptions.JedisAskDataException; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.util.RedisInputStream; import redis.clients.util.RedisOutputStream; import redis.clients.util.SafeEncoder; public final class Protocol { - public static final int DEFAULT_PORT = 6379; + private static final String ASK_RESPONSE = "ASK"; + private static final String MOVED_RESPONSE = "MOVED"; + public static final int DEFAULT_PORT = 6379; public static final int DEFAULT_SENTINEL_PORT = 26379; public static final int DEFAULT_TIMEOUT = 2000; public static final int DEFAULT_DATABASE = 0; @@ -30,6 +34,17 @@ public final class Protocol { public static final String SENTINEL_RESET = "reset"; public static final String SENTINEL_SLAVES = "slaves"; + public static final String CLUSTER_NODES = "nodes"; + public static final String CLUSTER_MEET = "meet"; + public static final String CLUSTER_ADDSLOTS = "addslots"; + public static final String CLUSTER_DELSLOTS = "delslots"; + public static final String CLUSTER_INFO = "info"; + public static final String CLUSTER_GETKEYSINSLOT = "getkeysinslot"; + public static final String CLUSTER_SETSLOT = "setslot"; + public static final String CLUSTER_SETSLOT_NODE = "node"; + public static final String CLUSTER_SETSLOT_MIGRATING = "migrating"; + public static final String CLUSTER_SETSLOT_IMPORTING = "importing"; + private Protocol() { // this prevent the class from instantiation } @@ -61,8 +76,28 @@ public final class Protocol { } private static void processError(final RedisInputStream is) { - String message = is.readLine(); - throw new JedisDataException(message); + String message = is.readLine(); + //TODO: I'm not sure if this is the best way to do this. + //Maybe Read only first 5 bytes instead? + // + if (message.contains(MOVED_RESPONSE)) { + String[] movedInfo = parseTargetHostAndSlot(message); + throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1], Integer.valueOf(movedInfo[2])), Integer.valueOf(movedInfo[0])); + } else if (message.contains(ASK_RESPONSE)) { + String[] askInfo = parseTargetHostAndSlot(message); + throw new JedisAskDataException(message, new HostAndPort(askInfo[1], Integer.valueOf(askInfo[2])), Integer.valueOf(askInfo[0])); + } + throw new JedisDataException(message); + } + + private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) { + String[] response = new String[3]; + String[] messageInfo = clusterRedirectResponse.split(" "); + String[] targetHostAndPort = messageInfo[2].split(":"); + response[0] = messageInfo[1]; + response[1] = targetHostAndPort[0]; + response[2] = targetHostAndPort[1]; + return response; } private static Object process(final RedisInputStream is) { @@ -157,8 +192,7 @@ public final class Protocol { } public static enum Command { - PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, - DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT; + PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING; public final byte[] raw; diff --git a/src/main/java/redis/clients/jedis/exceptions/JedisAskDataException.java b/src/main/java/redis/clients/jedis/exceptions/JedisAskDataException.java new file mode 100644 index 0000000..599a779 --- /dev/null +++ b/src/main/java/redis/clients/jedis/exceptions/JedisAskDataException.java @@ -0,0 +1,20 @@ +package redis.clients.jedis.exceptions; + +import redis.clients.jedis.HostAndPort; + +public class JedisAskDataException extends JedisRedirectionException { + private static final long serialVersionUID = 3878126572474819403L; + + public JedisAskDataException(Throwable cause, HostAndPort targetHost, int slot) { + super(cause, targetHost, slot); + } + + public JedisAskDataException(String message, Throwable cause, HostAndPort targetHost, int slot) { + super(message, cause, targetHost, slot); + } + + public JedisAskDataException(String message, HostAndPort targetHost, int slot) { + super(message, targetHost, slot); + } + +} diff --git a/src/main/java/redis/clients/jedis/exceptions/JedisClusterException.java b/src/main/java/redis/clients/jedis/exceptions/JedisClusterException.java new file mode 100644 index 0000000..e20d5a7 --- /dev/null +++ b/src/main/java/redis/clients/jedis/exceptions/JedisClusterException.java @@ -0,0 +1,18 @@ +package redis.clients.jedis.exceptions; + + +public class JedisClusterException extends JedisDataException { + private static final long serialVersionUID = 3878126572474819403L; + + public JedisClusterException(Throwable cause) { + super(cause); + } + + public JedisClusterException(String message, Throwable cause) { + super(message, cause); + } + + public JedisClusterException(String message) { + super(message); + } +} diff --git a/src/main/java/redis/clients/jedis/exceptions/JedisClusterMaxRedirectionsException.java b/src/main/java/redis/clients/jedis/exceptions/JedisClusterMaxRedirectionsException.java new file mode 100644 index 0000000..519188b --- /dev/null +++ b/src/main/java/redis/clients/jedis/exceptions/JedisClusterMaxRedirectionsException.java @@ -0,0 +1,18 @@ +package redis.clients.jedis.exceptions; + + +public class JedisClusterMaxRedirectionsException extends JedisDataException { + private static final long serialVersionUID = 3878126572474819403L; + + public JedisClusterMaxRedirectionsException(Throwable cause) { + super(cause); + } + + public JedisClusterMaxRedirectionsException(String message, Throwable cause) { + super(message, cause); + } + + public JedisClusterMaxRedirectionsException(String message) { + super(message); + } +} diff --git a/src/main/java/redis/clients/jedis/exceptions/JedisMovedDataException.java b/src/main/java/redis/clients/jedis/exceptions/JedisMovedDataException.java new file mode 100644 index 0000000..c7a0873 --- /dev/null +++ b/src/main/java/redis/clients/jedis/exceptions/JedisMovedDataException.java @@ -0,0 +1,21 @@ +package redis.clients.jedis.exceptions; + +import redis.clients.jedis.HostAndPort; + + +public class JedisMovedDataException extends JedisRedirectionException { + private static final long serialVersionUID = 3878126572474819403L; + + + public JedisMovedDataException(String message, HostAndPort targetNode, int slot) { + super(message, targetNode, slot); + } + + public JedisMovedDataException(Throwable cause, HostAndPort targetNode, int slot) { + super(cause, targetNode, slot); + } + + public JedisMovedDataException(String message, Throwable cause, HostAndPort targetNode, int slot) { + super(message, cause, targetNode, slot); + } +} diff --git a/src/main/java/redis/clients/jedis/exceptions/JedisRedirectionException.java b/src/main/java/redis/clients/jedis/exceptions/JedisRedirectionException.java new file mode 100644 index 0000000..65969f3 --- /dev/null +++ b/src/main/java/redis/clients/jedis/exceptions/JedisRedirectionException.java @@ -0,0 +1,37 @@ +package redis.clients.jedis.exceptions; + +import redis.clients.jedis.HostAndPort; + + +public class JedisRedirectionException extends JedisDataException { + private static final long serialVersionUID = 3878126572474819403L; + + private HostAndPort targetNode; + private int slot; + + public JedisRedirectionException(String message, HostAndPort targetNode, int slot) { + super(message); + this.targetNode = targetNode; + this.slot = slot; + } + + public JedisRedirectionException(Throwable cause, HostAndPort targetNode, int slot) { + super(cause); + this.targetNode = targetNode; + this.slot = slot; + } + + public JedisRedirectionException(String message, Throwable cause, HostAndPort targetNode, int slot) { + super(message, cause); + this.targetNode = targetNode; + this.slot = slot; + } + + public HostAndPort getTargetNode() { + return targetNode; + } + + public int getSlot() { + return slot; + } +} diff --git a/src/main/java/redis/clients/util/JedisClusterCRC16.java b/src/main/java/redis/clients/util/JedisClusterCRC16.java new file mode 100644 index 0000000..044e003 --- /dev/null +++ b/src/main/java/redis/clients/util/JedisClusterCRC16.java @@ -0,0 +1,21 @@ +package redis.clients.util; + +public class JedisClusterCRC16 { + public final static int polynomial = 0x1021; // Represents x^16+x^12+x^5+1 + static int crc; + + public static int getSlot(String key) { + crc = 0x0000; + for (byte b : key.getBytes()) { + for (int i = 0; i < 8; i++) { + boolean bit = ((b >> (7-i) & 1) == 1); + boolean c15 = ((crc >> 15 & 1) == 1); + crc <<= 1; + // If coefficient of bit and remainder polynomial = 1 xor crc with polynomial + if (c15 ^ bit) crc ^= polynomial; + } + } + + return crc &= 0xffff % 16384; + } +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java b/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java index be64c53..c7be599 100644 --- a/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java +++ b/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java @@ -9,83 +9,106 @@ import redis.clients.jedis.Protocol; public class HostAndPortUtil { private static List redisHostAndPortList = new ArrayList(); private static List sentinelHostAndPortList = new ArrayList(); + private static List clusterHostAndPortList = new ArrayList(); static { - - HostAndPort defaulthnp1 = new HostAndPort("localhost", Protocol.DEFAULT_PORT); - redisHostAndPortList.add(defaulthnp1); - HostAndPort defaulthnp2 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 1); - redisHostAndPortList.add(defaulthnp2); - - HostAndPort defaulthnp3 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 2); - redisHostAndPortList.add(defaulthnp3); - - HostAndPort defaulthnp4 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 3); - redisHostAndPortList.add(defaulthnp4); - - HostAndPort defaulthnp5 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 4); - redisHostAndPortList.add(defaulthnp5); - - HostAndPort defaulthnp6 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 5); - redisHostAndPortList.add(defaulthnp6); - - HostAndPort defaulthnp7 = new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT); - sentinelHostAndPortList.add(defaulthnp7); - - HostAndPort defaulthnp8 = new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 1); - sentinelHostAndPortList.add(defaulthnp8); - - HostAndPort defaulthnp9 = new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 2); - sentinelHostAndPortList.add(defaulthnp9); + HostAndPort defaulthnp1 = new HostAndPort("localhost", + Protocol.DEFAULT_PORT); + redisHostAndPortList.add(defaulthnp1); - String envRedisHosts = System.getProperty("redis-hosts"); - String envSentinelHosts = System.getProperty("sentinel-hosts"); - - redisHostAndPortList = parseHosts(envRedisHosts, redisHostAndPortList); - sentinelHostAndPortList = parseHosts(envSentinelHosts, sentinelHostAndPortList); + HostAndPort defaulthnp2 = new HostAndPort("localhost", + Protocol.DEFAULT_PORT + 1); + redisHostAndPortList.add(defaulthnp2); + + HostAndPort defaulthnp3 = new HostAndPort("localhost", + Protocol.DEFAULT_PORT + 2); + redisHostAndPortList.add(defaulthnp3); + + HostAndPort defaulthnp4 = new HostAndPort("localhost", + Protocol.DEFAULT_PORT + 3); + redisHostAndPortList.add(defaulthnp4); + + HostAndPort defaulthnp5 = new HostAndPort("localhost", + Protocol.DEFAULT_PORT + 4); + redisHostAndPortList.add(defaulthnp5); + + HostAndPort defaulthnp6 = new HostAndPort("localhost", + Protocol.DEFAULT_PORT + 5); + redisHostAndPortList.add(defaulthnp6); + + HostAndPort defaulthnp7 = new HostAndPort("localhost", + Protocol.DEFAULT_SENTINEL_PORT); + sentinelHostAndPortList.add(defaulthnp7); + + HostAndPort defaulthnp8 = new HostAndPort("localhost", + Protocol.DEFAULT_SENTINEL_PORT + 1); + sentinelHostAndPortList.add(defaulthnp8); + + HostAndPort defaulthnp9 = new HostAndPort("localhost", + Protocol.DEFAULT_SENTINEL_PORT + 2); + sentinelHostAndPortList.add(defaulthnp9); + + clusterHostAndPortList.add(new HostAndPort("localhost", 7379)); + clusterHostAndPortList.add(new HostAndPort("localhost", 7380)); + clusterHostAndPortList.add(new HostAndPort("localhost", 7381)); + + String envRedisHosts = System.getProperty("redis-hosts"); + String envSentinelHosts = System.getProperty("sentinel-hosts"); + String envClusterHosts = System.getProperty("cluster-hosts"); + + redisHostAndPortList = parseHosts(envRedisHosts, redisHostAndPortList); + sentinelHostAndPortList = parseHosts(envSentinelHosts, + sentinelHostAndPortList); + clusterHostAndPortList = parseHosts(envClusterHosts, + clusterHostAndPortList); } - public static List parseHosts(String envHosts, List existingHostsAndPorts) { - - if (null != envHosts && 0 < envHosts.length()) { - - String[] hostDefs = envHosts.split(","); - - if (null != hostDefs && 2 <= hostDefs.length) { - - List envHostsAndPorts = new ArrayList(hostDefs.length); - - for (String hostDef : hostDefs) { - - String[] hostAndPort = hostDef.split(":"); - - if (null != hostAndPort && 2 == hostAndPort.length) { - String host = hostAndPort[0]; - int port = Protocol.DEFAULT_PORT; - - try { - port = Integer.parseInt(hostAndPort[1]); - } catch (final NumberFormatException nfe) { - } - - envHostsAndPorts.add(new HostAndPort(host, port)); - } - } - - return envHostsAndPorts; - } - } - - return existingHostsAndPorts; + public static List parseHosts(String envHosts, + List existingHostsAndPorts) { + + if (null != envHosts && 0 < envHosts.length()) { + + String[] hostDefs = envHosts.split(","); + + if (null != hostDefs && 2 <= hostDefs.length) { + + List envHostsAndPorts = new ArrayList( + hostDefs.length); + + for (String hostDef : hostDefs) { + + String[] hostAndPort = hostDef.split(":"); + + if (null != hostAndPort && 2 == hostAndPort.length) { + String host = hostAndPort[0]; + int port = Protocol.DEFAULT_PORT; + + try { + port = Integer.parseInt(hostAndPort[1]); + } catch (final NumberFormatException nfe) { + } + + envHostsAndPorts.add(new HostAndPort(host, port)); + } + } + + return envHostsAndPorts; + } + } + + return existingHostsAndPorts; } - + public static List getRedisServers() { - return redisHostAndPortList; - } - - public static List getSentinelServers() { - return sentinelHostAndPortList; + return redisHostAndPortList; } + public static List getSentinelServers() { + return sentinelHostAndPortList; + } + + public static List getClusterServers() { + return clusterHostAndPortList; + } } diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java new file mode 100644 index 0000000..ef48a21 --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java @@ -0,0 +1,196 @@ +package redis.clients.jedis.tests; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.exceptions.JedisAskDataException; +import redis.clients.jedis.exceptions.JedisClusterException; +import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException; +import redis.clients.jedis.exceptions.JedisMovedDataException; +import redis.clients.util.JedisClusterCRC16; + +public class JedisClusterTest extends Assert { + private Jedis node1; + private Jedis node2; + private Jedis node3; + + private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0); + private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1); + private HostAndPort nodeInfo3 = HostAndPortUtil.getClusterServers().get(2); + + @Before + public void setUp() throws InterruptedException { + node1 = new Jedis(nodeInfo1.getHost(), nodeInfo1.getPort()); + node1.connect(); + node1.flushAll(); + + node2 = new Jedis(nodeInfo2.getHost(), nodeInfo2.getPort()); + node2.connect(); + node2.flushAll(); + + node3 = new Jedis(nodeInfo3.getHost(), nodeInfo3.getPort()); + node3.connect(); + node3.flushAll(); + + // ---- configure cluster + + // add nodes to cluster + node1.clusterMeet("127.0.0.1", nodeInfo1.getPort()); + node1.clusterMeet("127.0.0.1", nodeInfo2.getPort()); + node1.clusterMeet("127.0.0.1", nodeInfo3.getPort()); + + // split available slots across the three nodes + int slotsPerNode = JedisCluster.HASHSLOTS / 3; + Pipeline pipeline1 = node1.pipelined(); + Pipeline pipeline2 = node2.pipelined(); + Pipeline pipeline3 = node3.pipelined(); + for (int i = 0; i < JedisCluster.HASHSLOTS; i++) { + if (i < slotsPerNode) { + pipeline1.clusterAddSlots(i); + } else if (i > slotsPerNode * 2) { + pipeline3.clusterAddSlots(i); + } else { + pipeline2.clusterAddSlots(i); + } + } + pipeline1.sync(); + pipeline2.sync(); + pipeline3.sync(); + + + waitForClusterReady(); + } + + + @After + public void tearDown() { + // clear all slots + int[] slotsToDelete = new int[JedisCluster.HASHSLOTS]; + for (int i = 0; i < JedisCluster.HASHSLOTS; i++) { + slotsToDelete[i] = i; + } + node1.clusterDelSlots(slotsToDelete); + node2.clusterDelSlots(slotsToDelete); + node3.clusterDelSlots(slotsToDelete); + } + + @Test(expected=JedisMovedDataException.class) + public void testThrowMovedException() { + node1.set("foo", "bar"); + } + + @Test + public void testMovedExceptionParameters() { + try { + node1.set("foo", "bar"); + } catch (JedisMovedDataException jme) { + assertEquals(12182, jme.getSlot()); + assertEquals(new HostAndPort("127.0.0.1", 7381), jme.getTargetNode()); + } + fail(); + } + + @Test(expected=JedisAskDataException.class) + public void testThrowAskException() { + int keySlot = JedisClusterCRC16.getSlot("test"); + String node3Id = getNodeId(node3.clusterNodes()); + node2.clusterSetSlotMigrating(keySlot, node3Id); + node2.get("test"); + } + + @Test + public void testDiscoverNodesAutomatically() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + JedisCluster jc = new JedisCluster(jedisClusterNode); + assertEquals(jc.getClusterNodes().size(), 3); + } + + @Test + public void testCalculateConnectionPerSlot() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + JedisCluster jc = new JedisCluster(jedisClusterNode); + jc.set("foo", "bar"); + jc.set("test", "test"); + assertEquals("bar",node3.get("foo")); + assertEquals("test",node2.get("test")); + } + + @Test + public void testRecalculateSlotsWhenMoved() throws InterruptedException { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + JedisCluster jc = new JedisCluster(jedisClusterNode); + int slot51 = JedisClusterCRC16.getSlot("51"); + node2.clusterDelSlots(slot51); + node3.clusterDelSlots(slot51); + node3.clusterAddSlots(slot51); + + waitForClusterReady(); + jc.set("51", "foo"); + assertEquals("foo", jc.get("51")); + } + + @Test + public void testAskResponse() throws InterruptedException { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + JedisCluster jc = new JedisCluster(jedisClusterNode); + int slot51 = JedisClusterCRC16.getSlot("51"); + node3.clusterSetSlotImporting(slot51, getNodeId(node2.clusterNodes())); + node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes())); + jc.set("51", "foo"); + assertEquals("foo", jc.get("51")); + } + + @Test(expected=JedisClusterException.class) + public void testThrowExceptionWithoutKey() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + JedisCluster jc = new JedisCluster(jedisClusterNode); + jc.ping(); + } + + @Test(expected=JedisClusterMaxRedirectionsException.class) + public void testRedisClusterMaxRedirections() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + JedisCluster jc = new JedisCluster(jedisClusterNode); + int slot51 = JedisClusterCRC16.getSlot("51"); + //This will cause an infinite redirection loop + node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes())); + jc.set("51", "foo"); + } + + private String getNodeId(String infoOutput) { + for (String infoLine : infoOutput.split("\n")) { + if (infoLine.contains("myself")) { + return infoLine.split(" ")[0]; + } + } + return ""; + } + + private void waitForClusterReady() throws InterruptedException { + boolean clusterOk = false; + while (!clusterOk) { + if (node1.clusterInfo().split("\n")[0].contains("ok") && + node2.clusterInfo().split("\n")[0].contains("ok") && + node3.clusterInfo().split("\n")[0].contains("ok") ) { + clusterOk = true; + } + Thread.sleep(50); + } + } + +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/commands/ClusterCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/ClusterCommandsTest.java new file mode 100644 index 0000000..a7045a6 --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/commands/ClusterCommandsTest.java @@ -0,0 +1,138 @@ +package redis.clients.jedis.tests.commands; + +import java.util.List; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.tests.HostAndPortUtil; +import redis.clients.jedis.tests.JedisTestBase; + +public class ClusterCommandsTest extends JedisTestBase { + private static Jedis node1; + private static Jedis node2; + + private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0); + private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1); + + @Before + public void setUp() throws Exception { + + node1 = new Jedis(nodeInfo1.getHost(), nodeInfo1.getPort()); + node1.connect(); + node1.flushAll(); + + node2 = new Jedis(nodeInfo2.getHost(), nodeInfo2.getPort()); + node2.connect(); + node2.flushAll(); + } + + @After + public void tearDown() { + node1.disconnect(); + node2.disconnect(); + } + + @AfterClass + public static void removeSlots() throws InterruptedException { + //This is to wait for gossip to replicate data. + waitForEqualClusterSize(); + String[] nodes = node1.clusterNodes().split("\n"); + String node1Id = nodes[0].split(" ")[0]; + node1.clusterDelSlots(1,2,3,4,5,500); + node1.clusterSetSlotNode(5000, node1Id); + node1.clusterDelSlots(5000, 10000); + node1.clusterDelSlots(6000); + node2.clusterDelSlots(6000,1,2,3,4,5,500,5000); + try { + node2.clusterDelSlots(10000); + } catch (JedisDataException jde) { + //Do nothing, slot may or may not be assigned depending on gossip + } + } + + private static void waitForEqualClusterSize() throws InterruptedException { + boolean notEqualSize = true; + while (notEqualSize) { + notEqualSize = getClusterAttribute(node1.clusterInfo(), "cluster_known_nodes") == getClusterAttribute(node2.clusterInfo(), "cluster_size") ? false : true; + } + } + + private static int getClusterAttribute(String clusterInfo, String attributeName) { + for (String infoElement: clusterInfo.split("\n")) { + if (infoElement.contains(attributeName)) { + return Integer.valueOf(infoElement.split(":")[1].trim()); + } + } + return 0; + } + + @Test + public void clusterNodes() { + String nodes = node1.clusterNodes(); + assertTrue(nodes.split("\n").length > 0); + } + + @Test + public void clusterMeet() { + String status = node1.clusterMeet("127.0.0.1", nodeInfo2.getPort()); + assertEquals("OK", status); + } + + @Test + public void clusterAddSlots() { + String status = node1.clusterAddSlots(1, 2, 3, 4, 5); + assertEquals("OK", status); + } + + @Test + public void clusterDelSlots() { + node1.clusterAddSlots(900); + String status = node1.clusterDelSlots(900); + assertEquals("OK", status); + } + + @Test + public void clusterInfo() { + String info = node1.clusterInfo(); + assertNotNull(info); + } + + @Test + public void clusterGetKeysInSlot() { + node1.clusterAddSlots(500); + List keys = node1.clusterGetKeysInSlot(500, 1); + assertEquals(0, keys.size()); + } + + @Test + public void clusterSetSlotNode() { + String[] nodes = node1.clusterNodes().split("\n"); + String nodeId = nodes[0].split(" ")[0]; + String status = node1.clusterSetSlotNode(10000, nodeId); + assertEquals("OK", status); + } + + @Test + public void clusterSetSlotMigrating() { + node1.clusterAddSlots(5000); + String[] nodes = node1.clusterNodes().split("\n"); + String nodeId = nodes[0].split(" ")[0]; + String status = node1.clusterSetSlotMigrating(5000, nodeId); + assertEquals("OK", status); + } + + @Test + public void clusterSetSlotImporting() { + node2.clusterAddSlots(6000); + String[] nodes = node1.clusterNodes().split("\n"); + String nodeId = nodes[0].split(" ")[0]; + String status = node1.clusterSetSlotImporting(6000, nodeId); + assertEquals("OK", status); + } +} \ No newline at end of file