Merge branch 'cluster' of github.com:marcosnils/jedis into marcosnils-cluster

Conflicts:
	src/main/java/redis/clients/jedis/BinaryClient.java
	src/main/java/redis/clients/jedis/Client.java
	src/main/java/redis/clients/jedis/Jedis.java
	src/main/java/redis/clients/jedis/Protocol.java
This commit is contained in:
Jonathan Leibiusky
2014-01-16 16:35:04 -05:00
25 changed files with 2513 additions and 80 deletions

View File

@@ -65,6 +65,7 @@ appendonly no
slaveof localhost 6379
endef
# SENTINELS
define REDIS_SENTINEL1
port 26379
daemonize yes
@@ -101,6 +102,40 @@ pidfile /tmp/sentinel3.pid
logfile /tmp/sentinel3.log
endef
# CLUSTER REDIS NODES
define REDIS_CLUSTER_NODE1_CONF
daemonize yes
port 7379
pidfile /tmp/redis_cluster_node1.pid
logfile /tmp/redis_cluster_node1.log
save ""
appendonly no
cluster-enabled yes
cluster-config-file /tmp/redis_cluster_node1.conf
endef
define REDIS_CLUSTER_NODE2_CONF
daemonize yes
port 7380
pidfile /tmp/redis_cluster_node2.pid
logfile /tmp/redis_cluster_node2.log
save ""
appendonly no
cluster-enabled yes
cluster-config-file /tmp/redis_cluster_node2.conf
endef
define REDIS_CLUSTER_NODE3_CONF
daemonize yes
port 7381
pidfile /tmp/redis_cluster_node3.pid
logfile /tmp/redis_cluster_node3.log
save ""
appendonly no
cluster-enabled yes
cluster-config-file /tmp/redis_cluster_node3.conf
endef
export REDIS1_CONF
export REDIS2_CONF
export REDIS3_CONF
@@ -110,8 +145,11 @@ export REDIS6_CONF
export REDIS_SENTINEL1
export REDIS_SENTINEL2
export REDIS_SENTINEL3
export REDIS_CLUSTER_NODE1_CONF
export REDIS_CLUSTER_NODE2_CONF
export REDIS_CLUSTER_NODE3_CONF
start:
start: cleanup
echo "$$REDIS1_CONF" | redis-server -
echo "$$REDIS2_CONF" | redis-server -
echo "$$REDIS3_CONF" | redis-server -
@@ -123,6 +161,12 @@ start:
echo "$$REDIS_SENTINEL2" > /tmp/sentinel2.conf && redis-server /tmp/sentinel2.conf --sentinel
@sleep 0.5
echo "$$REDIS_SENTINEL3" > /tmp/sentinel3.conf && redis-server /tmp/sentinel3.conf --sentinel
echo "$$REDIS_CLUSTER_NODE1_CONF" | redis-server -
echo "$$REDIS_CLUSTER_NODE2_CONF" | redis-server -
echo "$$REDIS_CLUSTER_NODE3_CONF" | redis-server -
cleanup:
rm -vf /tmp/redis_cluster_node*.conf
stop:
kill `cat /tmp/redis1.pid`
@@ -135,10 +179,16 @@ stop:
kill `cat /tmp/sentinel1.pid`
kill `cat /tmp/sentinel2.pid`
kill `cat /tmp/sentinel3.pid`
kill `cat /tmp/redis_cluster_node1.pid` || true
kill `cat /tmp/redis_cluster_node2.pid` || true
kill `cat /tmp/redis_cluster_node3.pid` || true
rm -f /tmp/redis_cluster_node1.conf
rm -f /tmp/redis_cluster_node2.conf
rm -f /tmp/redis_cluster_node3.conf
test:
make start
mvn clean compile test
mvn -Dtest=${TEST} clean compile test
make stop
deploy:

View File

@@ -47,6 +47,7 @@
<properties>
<redis-hosts>localhost:6379,localhost:6380,localhost:6381,localhost:6382,localhost:6383,localhost:6384</redis-hosts>
<sentinel-hosts>localhost:26379,localhost:26380,localhost:26381</sentinel-hosts>
<cluster-hosts>localhost:7379,localhost:7380,localhost:7381</cluster-hosts>
<github.global.server>github</github.global.server>
</properties>

View File

@@ -1135,4 +1135,11 @@ public class BinaryClient extends Connection {
public void waitReplicas(int replicas, long timeout) {
sendCommand(WAIT, toByteArray(replicas), toByteArray(timeout));
}
public void cluster(final byte[]... args) {
sendCommand(CLUSTER, args);
}
public void asking() {
sendCommand(Command.ASKING);
}
}

View File

@@ -1697,7 +1697,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKey
protected void checkIsInMulti() {
if (client.isInMulti()) {
throw new JedisDataException(
throw new JedisDataException(
"Cannot use Jedis when in Multi. Please use JedisTransaction instead.");
}
}

View File

@@ -820,4 +820,65 @@ public class Client extends BinaryClient implements Commands {
public void zscan(final String key, int cursor, final ScanParams params) {
zscan(SafeEncoder.encode(key), cursor, params);
}
public void cluster(final String subcommand, final int... args) {
final byte[][] arg = new byte[args.length+1][];
for (int i = 1; i < arg.length; i++) {
arg[i] = toByteArray(args[i-1]);
}
arg[0] = SafeEncoder.encode(subcommand);
cluster(arg);
}
public void cluster(final String subcommand, final String... args) {
final byte[][] arg = new byte[args.length+1][];
for (int i = 1; i < arg.length; i++) {
arg[i] = SafeEncoder.encode(args[i-1]);
}
arg[0] = SafeEncoder.encode(subcommand);
cluster(arg);
}
public void cluster(final String subcommand) {
final byte[][] arg = new byte[1][];
arg[0] = SafeEncoder.encode(subcommand);
cluster(arg);
}
public void clusterNodes() {
cluster(Protocol.CLUSTER_NODES);
}
public void clusterMeet(final String ip, final int port) {
cluster(Protocol.CLUSTER_MEET, ip, String.valueOf(port));
}
public void clusterAddSlots(final int ...slots) {
cluster(Protocol.CLUSTER_ADDSLOTS, slots);
}
public void clusterDelSlots(final int ...slots) {
cluster(Protocol.CLUSTER_DELSLOTS, slots);
}
public void clusterInfo() {
cluster(Protocol.CLUSTER_INFO);
}
public void clusterGetKeysInSlot(final int slot, final int count) {
final int[] args = new int[]{ slot, count };
cluster(Protocol.CLUSTER_GETKEYSINSLOT, args);
}
public void clusterSetSlotNode(final int slot, final String nodeId) {
cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot), Protocol.CLUSTER_SETSLOT_NODE, nodeId);
}
public void clusterSetSlotMigrating(final int slot, final String nodeId) {
cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot), Protocol.CLUSTER_SETSLOT_MIGRATING, nodeId);
}
public void clusterSetSlotImporting(final int slot, final String nodeId) {
cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot), Protocol.CLUSTER_SETSLOT_IMPORTING, nodeId);
}
}

View File

@@ -0,0 +1,23 @@
package redis.clients.jedis;
import java.util.List;
public interface ClusterCommands {
String clusterNodes();
String clusterMeet(final String ip, final int port);
String clusterAddSlots(final int... slots);
String clusterDelSlots(final int... slots);
String clusterInfo();
List<String> clusterGetKeysInSlot(final int slot, final int count);
String clusterSetSlotNode(final int slot, final String nodeId);
String clusterSetSlotMigrating(final int slot, final String nodeId);
String clusterSetSlotImporting(final int slot, final String nodeId);
}

View File

@@ -0,0 +1,23 @@
package redis.clients.jedis;
import java.util.List;
public interface ClusterPipeline {
Response<String> clusterNodes();
Response<String> clusterMeet(final String ip, final int port);
Response<String> clusterAddSlots(final int... slots);
Response<String> clusterDelSlots(final int... slots);
Response<String> clusterInfo();
Response<List<String>> clusterGetKeysInSlot(final int slot, final int count);
Response<String> clusterSetSlotNode(final int slot, final String nodeId);
Response<String> clusterSetSlotMigrating(final int slot, final String nodeId);
Response<String> clusterSetSlotImporting(final int slot, final String nodeId);
}

View File

@@ -13,8 +13,10 @@ import java.util.Set;
import redis.clients.jedis.BinaryClient.LIST_POSITION;
import redis.clients.util.SafeEncoder;
import redis.clients.util.Slowlog;
import java.net.URI;
import java.util.*;
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands {
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands {
public Jedis(final String host) {
super(host);
}
@@ -3153,4 +3155,63 @@ public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommand
}
return new ScanResult<Tuple>(newcursor, results);
}
public String clusterNodes() {
checkIsInMulti();
client.clusterNodes();
return client.getBulkReply();
}
public String clusterMeet(final String ip, final int port) {
checkIsInMulti();
client.clusterMeet(ip, port);
return client.getStatusCodeReply();
}
public String clusterAddSlots(final int ...slots) {
checkIsInMulti();
client.clusterAddSlots(slots);
return client.getStatusCodeReply();
}
public String clusterDelSlots(final int ...slots) {
checkIsInMulti();
client.clusterDelSlots(slots);
return client.getStatusCodeReply();
}
public String clusterInfo() {
checkIsInMulti();
client.clusterInfo();
return client.getStatusCodeReply();
}
public List<String> clusterGetKeysInSlot(final int slot, final int count) {
checkIsInMulti();
client.clusterGetKeysInSlot(slot, count);
return client.getMultiBulkReply();
}
public String clusterSetSlotNode(final int slot, final String nodeId) {
checkIsInMulti();
client.clusterSetSlotNode(slot, nodeId);
return client.getStatusCodeReply();
}
public String clusterSetSlotMigrating(final int slot, final String nodeId) {
checkIsInMulti();
client.clusterSetSlotMigrating(slot, nodeId);
return client.getStatusCodeReply();
}
public String clusterSetSlotImporting(final int slot, final String nodeId) {
checkIsInMulti();
client.clusterSetSlotImporting(slot, nodeId);
return client.getStatusCodeReply();
}
public String asking() {
checkIsInMulti();
client.asking();
return client.getStatusCodeReply();
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,53 @@
package redis.clients.jedis;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
public abstract class JedisClusterCommand<T> {
private boolean asking = false;
private JedisClusterConnectionHandler connectionHandler;
private int commandTimeout;
private int redirections;
// private boolean asking = false;
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int timeout, int maxRedirections) {
this.connectionHandler = connectionHandler;
this.commandTimeout = timeout;
this.redirections = maxRedirections;
}
public abstract T execute();
public T run(String key) {
try {
if (key == null) {
throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
} else if (redirections == 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
if (asking) {
//TODO: Pipeline asking with the original command to make it faster....
connectionHandler.getConnection().asking();
}
return execute();
} catch (JedisRedirectionException jre) {
return handleRedirection(jre, key);
}
}
private T handleRedirection(JedisRedirectionException jre, String key) {
if (jre instanceof JedisAskDataException) {
asking = true;
}
redirections--;
this.connectionHandler.assignSlotToNode(jre.getSlot(), jre.getTargetNode());
return run(key);
}
}

View File

@@ -0,0 +1,82 @@
package redis.clients.jedis;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
public abstract class JedisClusterConnectionHandler {
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
abstract Jedis getConnection();
abstract Jedis getConnectionFromSlot(int slot);
public JedisClusterConnectionHandler(Set<HostAndPort> nodes) {
initializeSlotsCache(nodes);
}
public Map<String, JedisPool> getNodes() {
return nodes;
}
private void initializeSlotsCache(Set<HostAndPort> nodes) {
for (HostAndPort hostAndPort : nodes) {
JedisPool jp = new JedisPool(hostAndPort.getHost(), hostAndPort.getPort());
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
discoverClusterNodesAndSlots(jp);
}
}
private void discoverClusterNodesAndSlots(JedisPool jp) {
String localNodes = jp.getResource().clusterNodes();
for (String nodeInfo : localNodes.split("\n")) {
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo);
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
this.nodes.put(node.getHost() + node.getPort(), nodePool);
populateNodeSlots(nodeInfo, nodePool);
}
}
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
String[] nodeInfoArray = nodeInfo.split(" ");
if (nodeInfoArray.length > 7) {
for (int i = 8; i < nodeInfoArray.length; i++) {
processSlot(nodeInfoArray[i], nodePool);
}
}
}
private void processSlot(String slot, JedisPool nodePool) {
if (slot.contains("-")) {
String[] slotRange = slot.split("-");
for (int i = Integer.valueOf(slotRange[0]); i <= Integer.valueOf(slotRange[1]); i++) {
slots.put(i, nodePool);
}
} else {
slots.put(Integer.valueOf(slot), nodePool);
}
}
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo) {
String stringHostAndPort = nodeInfo.split(" ",3)[1];
String[] arrayHostAndPort = stringHostAndPort.split(":");
return new HostAndPort(arrayHostAndPort[0], Integer.valueOf(arrayHostAndPort[1]));
}
public void assignSlotToNode(int slot, HostAndPort targetNode) {
JedisPool targetPool = nodes.get(targetNode.getHost() + targetNode.getPort());
slots.put(slot, targetPool);
}
protected JedisPool getRandomConnection() {
Object[] nodeArray = nodes.values().toArray();
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
}
}

View File

@@ -18,7 +18,7 @@ public class JedisPool extends Pool<Jedis> {
this(new GenericObjectPoolConfig(), host, port,
Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null);
}
public JedisPool(final String host) {
URI uri = URI.create(host);
if (uri.getScheme() != null && uri.getScheme().equals("redis")) {

View File

@@ -0,0 +1,20 @@
package redis.clients.jedis;
import java.util.Set;
public class JedisRandomConnectionHandler extends JedisClusterConnectionHandler {
public JedisRandomConnectionHandler(Set<HostAndPort> nodes) {
super(nodes);
}
public Jedis getConnection() {
return getRandomConnection().getResource();
}
@Override
Jedis getConnectionFromSlot(int slot) {
return getRandomConnection().getResource();
}
}

View File

@@ -0,0 +1,48 @@
package redis.clients.jedis;
import java.util.Set;
public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
private Jedis currentConnection;
public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes) {
super(nodes);
}
public Jedis getConnection() {
return currentConnection != null ? currentConnection : getRandomConnection().getResource();
}
private void returnCurrentConnection() {
if (currentConnection != null) {
nodes.get(currentConnection.getClient().getHost()+currentConnection.getClient().getPort()).returnResource(currentConnection);
}
}
@Override
public void assignSlotToNode(int slot, HostAndPort targetNode) {
super.assignSlotToNode(slot, targetNode);
getConnectionFromSlot(slot);
}
@Override
public Jedis getConnectionFromSlot(int slot) {
returnCurrentConnection();
JedisPool connectionPool = slots.get(slot);
if (connectionPool == null) {
connectionPool = getRandomConnection();
}
currentConnection = connectionPool.getResource();
return connectionPool.getResource();
}
}

View File

@@ -7,7 +7,8 @@ import java.util.Set;
abstract class MultiKeyPipelineBase extends PipelineBase implements
BasicRedisPipeline,
MultiKeyBinaryRedisPipeline,
MultiKeyCommandsPipeline {
MultiKeyCommandsPipeline,
ClusterPipeline {
protected Client client = null;
@@ -398,4 +399,49 @@ abstract class MultiKeyPipelineBase extends PipelineBase implements
client.bitop(op, destKey, srcKeys);
return getResponse(BuilderFactory.LONG);
}
public Response<String> clusterNodes() {
client.clusterNodes();
return getResponse(BuilderFactory.STRING);
}
public Response<String> clusterMeet(final String ip, final int port) {
client.clusterMeet(ip, port);
return getResponse(BuilderFactory.STRING);
}
public Response<String> clusterAddSlots(final int... slots) {
client.clusterAddSlots(slots);
return getResponse(BuilderFactory.STRING);
}
public Response<String> clusterDelSlots(final int... slots) {
client.clusterDelSlots(slots);
return getResponse(BuilderFactory.STRING);
}
public Response<String> clusterInfo() {
client.clusterInfo();
return getResponse(BuilderFactory.STRING);
}
public Response<List<String>> clusterGetKeysInSlot(final int slot, final int count) {
client.clusterGetKeysInSlot(slot, count);
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<String> clusterSetSlotNode(final int slot, final String nodeId) {
client.clusterSetSlotNode(slot, nodeId);
return getResponse(BuilderFactory.STRING);
}
public Response<String> clusterSetSlotMigrating(final int slot, final String nodeId) {
client.clusterSetSlotMigrating(slot, nodeId);
return getResponse(BuilderFactory.STRING);
}
public Response<String> clusterSetSlotImporting(final int slot, final String nodeId) {
client.clusterSetSlotImporting(slot, nodeId);
return getResponse(BuilderFactory.STRING);
}
}

View File

@@ -4,15 +4,19 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.util.RedisInputStream;
import redis.clients.util.RedisOutputStream;
import redis.clients.util.SafeEncoder;
public final class Protocol {
public static final int DEFAULT_PORT = 6379;
private static final String ASK_RESPONSE = "ASK";
private static final String MOVED_RESPONSE = "MOVED";
public static final int DEFAULT_PORT = 6379;
public static final int DEFAULT_SENTINEL_PORT = 26379;
public static final int DEFAULT_TIMEOUT = 2000;
public static final int DEFAULT_DATABASE = 0;
@@ -30,6 +34,17 @@ public final class Protocol {
public static final String SENTINEL_RESET = "reset";
public static final String SENTINEL_SLAVES = "slaves";
public static final String CLUSTER_NODES = "nodes";
public static final String CLUSTER_MEET = "meet";
public static final String CLUSTER_ADDSLOTS = "addslots";
public static final String CLUSTER_DELSLOTS = "delslots";
public static final String CLUSTER_INFO = "info";
public static final String CLUSTER_GETKEYSINSLOT = "getkeysinslot";
public static final String CLUSTER_SETSLOT = "setslot";
public static final String CLUSTER_SETSLOT_NODE = "node";
public static final String CLUSTER_SETSLOT_MIGRATING = "migrating";
public static final String CLUSTER_SETSLOT_IMPORTING = "importing";
private Protocol() {
// this prevent the class from instantiation
}
@@ -61,8 +76,28 @@ public final class Protocol {
}
private static void processError(final RedisInputStream is) {
String message = is.readLine();
throw new JedisDataException(message);
String message = is.readLine();
//TODO: I'm not sure if this is the best way to do this.
//Maybe Read only first 5 bytes instead?
//
if (message.contains(MOVED_RESPONSE)) {
String[] movedInfo = parseTargetHostAndSlot(message);
throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1], Integer.valueOf(movedInfo[2])), Integer.valueOf(movedInfo[0]));
} else if (message.contains(ASK_RESPONSE)) {
String[] askInfo = parseTargetHostAndSlot(message);
throw new JedisAskDataException(message, new HostAndPort(askInfo[1], Integer.valueOf(askInfo[2])), Integer.valueOf(askInfo[0]));
}
throw new JedisDataException(message);
}
private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) {
String[] response = new String[3];
String[] messageInfo = clusterRedirectResponse.split(" ");
String[] targetHostAndPort = messageInfo[2].split(":");
response[0] = messageInfo[1];
response[1] = targetHostAndPort[0];
response[2] = targetHostAndPort[1];
return response;
}
private static Object process(final RedisInputStream is) {
@@ -157,8 +192,7 @@ public final class Protocol {
}
public static enum Command {
PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL,
DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT;
PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING;
public final byte[] raw;

View File

@@ -0,0 +1,20 @@
package redis.clients.jedis.exceptions;
import redis.clients.jedis.HostAndPort;
public class JedisAskDataException extends JedisRedirectionException {
private static final long serialVersionUID = 3878126572474819403L;
public JedisAskDataException(Throwable cause, HostAndPort targetHost, int slot) {
super(cause, targetHost, slot);
}
public JedisAskDataException(String message, Throwable cause, HostAndPort targetHost, int slot) {
super(message, cause, targetHost, slot);
}
public JedisAskDataException(String message, HostAndPort targetHost, int slot) {
super(message, targetHost, slot);
}
}

View File

@@ -0,0 +1,18 @@
package redis.clients.jedis.exceptions;
public class JedisClusterException extends JedisDataException {
private static final long serialVersionUID = 3878126572474819403L;
public JedisClusterException(Throwable cause) {
super(cause);
}
public JedisClusterException(String message, Throwable cause) {
super(message, cause);
}
public JedisClusterException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,18 @@
package redis.clients.jedis.exceptions;
public class JedisClusterMaxRedirectionsException extends JedisDataException {
private static final long serialVersionUID = 3878126572474819403L;
public JedisClusterMaxRedirectionsException(Throwable cause) {
super(cause);
}
public JedisClusterMaxRedirectionsException(String message, Throwable cause) {
super(message, cause);
}
public JedisClusterMaxRedirectionsException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,21 @@
package redis.clients.jedis.exceptions;
import redis.clients.jedis.HostAndPort;
public class JedisMovedDataException extends JedisRedirectionException {
private static final long serialVersionUID = 3878126572474819403L;
public JedisMovedDataException(String message, HostAndPort targetNode, int slot) {
super(message, targetNode, slot);
}
public JedisMovedDataException(Throwable cause, HostAndPort targetNode, int slot) {
super(cause, targetNode, slot);
}
public JedisMovedDataException(String message, Throwable cause, HostAndPort targetNode, int slot) {
super(message, cause, targetNode, slot);
}
}

View File

@@ -0,0 +1,37 @@
package redis.clients.jedis.exceptions;
import redis.clients.jedis.HostAndPort;
public class JedisRedirectionException extends JedisDataException {
private static final long serialVersionUID = 3878126572474819403L;
private HostAndPort targetNode;
private int slot;
public JedisRedirectionException(String message, HostAndPort targetNode, int slot) {
super(message);
this.targetNode = targetNode;
this.slot = slot;
}
public JedisRedirectionException(Throwable cause, HostAndPort targetNode, int slot) {
super(cause);
this.targetNode = targetNode;
this.slot = slot;
}
public JedisRedirectionException(String message, Throwable cause, HostAndPort targetNode, int slot) {
super(message, cause);
this.targetNode = targetNode;
this.slot = slot;
}
public HostAndPort getTargetNode() {
return targetNode;
}
public int getSlot() {
return slot;
}
}

View File

@@ -0,0 +1,21 @@
package redis.clients.util;
public class JedisClusterCRC16 {
public final static int polynomial = 0x1021; // Represents x^16+x^12+x^5+1
static int crc;
public static int getSlot(String key) {
crc = 0x0000;
for (byte b : key.getBytes()) {
for (int i = 0; i < 8; i++) {
boolean bit = ((b >> (7-i) & 1) == 1);
boolean c15 = ((crc >> 15 & 1) == 1);
crc <<= 1;
// If coefficient of bit and remainder polynomial = 1 xor crc with polynomial
if (c15 ^ bit) crc ^= polynomial;
}
}
return crc &= 0xffff % 16384;
}
}

View File

@@ -9,83 +9,106 @@ import redis.clients.jedis.Protocol;
public class HostAndPortUtil {
private static List<HostAndPort> redisHostAndPortList = new ArrayList<HostAndPort>();
private static List<HostAndPort> sentinelHostAndPortList = new ArrayList<HostAndPort>();
private static List<HostAndPort> clusterHostAndPortList = new ArrayList<HostAndPort>();
static {
HostAndPort defaulthnp1 = new HostAndPort("localhost", Protocol.DEFAULT_PORT);
redisHostAndPortList.add(defaulthnp1);
HostAndPort defaulthnp2 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 1);
redisHostAndPortList.add(defaulthnp2);
HostAndPort defaulthnp3 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 2);
redisHostAndPortList.add(defaulthnp3);
HostAndPort defaulthnp4 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 3);
redisHostAndPortList.add(defaulthnp4);
HostAndPort defaulthnp5 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 4);
redisHostAndPortList.add(defaulthnp5);
HostAndPort defaulthnp6 = new HostAndPort("localhost", Protocol.DEFAULT_PORT + 5);
redisHostAndPortList.add(defaulthnp6);
HostAndPort defaulthnp7 = new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT);
sentinelHostAndPortList.add(defaulthnp7);
HostAndPort defaulthnp8 = new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 1);
sentinelHostAndPortList.add(defaulthnp8);
HostAndPort defaulthnp9 = new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 2);
sentinelHostAndPortList.add(defaulthnp9);
HostAndPort defaulthnp1 = new HostAndPort("localhost",
Protocol.DEFAULT_PORT);
redisHostAndPortList.add(defaulthnp1);
String envRedisHosts = System.getProperty("redis-hosts");
String envSentinelHosts = System.getProperty("sentinel-hosts");
redisHostAndPortList = parseHosts(envRedisHosts, redisHostAndPortList);
sentinelHostAndPortList = parseHosts(envSentinelHosts, sentinelHostAndPortList);
HostAndPort defaulthnp2 = new HostAndPort("localhost",
Protocol.DEFAULT_PORT + 1);
redisHostAndPortList.add(defaulthnp2);
HostAndPort defaulthnp3 = new HostAndPort("localhost",
Protocol.DEFAULT_PORT + 2);
redisHostAndPortList.add(defaulthnp3);
HostAndPort defaulthnp4 = new HostAndPort("localhost",
Protocol.DEFAULT_PORT + 3);
redisHostAndPortList.add(defaulthnp4);
HostAndPort defaulthnp5 = new HostAndPort("localhost",
Protocol.DEFAULT_PORT + 4);
redisHostAndPortList.add(defaulthnp5);
HostAndPort defaulthnp6 = new HostAndPort("localhost",
Protocol.DEFAULT_PORT + 5);
redisHostAndPortList.add(defaulthnp6);
HostAndPort defaulthnp7 = new HostAndPort("localhost",
Protocol.DEFAULT_SENTINEL_PORT);
sentinelHostAndPortList.add(defaulthnp7);
HostAndPort defaulthnp8 = new HostAndPort("localhost",
Protocol.DEFAULT_SENTINEL_PORT + 1);
sentinelHostAndPortList.add(defaulthnp8);
HostAndPort defaulthnp9 = new HostAndPort("localhost",
Protocol.DEFAULT_SENTINEL_PORT + 2);
sentinelHostAndPortList.add(defaulthnp9);
clusterHostAndPortList.add(new HostAndPort("localhost", 7379));
clusterHostAndPortList.add(new HostAndPort("localhost", 7380));
clusterHostAndPortList.add(new HostAndPort("localhost", 7381));
String envRedisHosts = System.getProperty("redis-hosts");
String envSentinelHosts = System.getProperty("sentinel-hosts");
String envClusterHosts = System.getProperty("cluster-hosts");
redisHostAndPortList = parseHosts(envRedisHosts, redisHostAndPortList);
sentinelHostAndPortList = parseHosts(envSentinelHosts,
sentinelHostAndPortList);
clusterHostAndPortList = parseHosts(envClusterHosts,
clusterHostAndPortList);
}
public static List<HostAndPort> parseHosts(String envHosts, List<HostAndPort> existingHostsAndPorts) {
if (null != envHosts && 0 < envHosts.length()) {
String[] hostDefs = envHosts.split(",");
if (null != hostDefs && 2 <= hostDefs.length) {
List<HostAndPort> envHostsAndPorts = new ArrayList<HostAndPort>(hostDefs.length);
for (String hostDef : hostDefs) {
String[] hostAndPort = hostDef.split(":");
if (null != hostAndPort && 2 == hostAndPort.length) {
String host = hostAndPort[0];
int port = Protocol.DEFAULT_PORT;
try {
port = Integer.parseInt(hostAndPort[1]);
} catch (final NumberFormatException nfe) {
}
envHostsAndPorts.add(new HostAndPort(host, port));
}
}
return envHostsAndPorts;
}
}
return existingHostsAndPorts;
public static List<HostAndPort> parseHosts(String envHosts,
List<HostAndPort> existingHostsAndPorts) {
if (null != envHosts && 0 < envHosts.length()) {
String[] hostDefs = envHosts.split(",");
if (null != hostDefs && 2 <= hostDefs.length) {
List<HostAndPort> envHostsAndPorts = new ArrayList<HostAndPort>(
hostDefs.length);
for (String hostDef : hostDefs) {
String[] hostAndPort = hostDef.split(":");
if (null != hostAndPort && 2 == hostAndPort.length) {
String host = hostAndPort[0];
int port = Protocol.DEFAULT_PORT;
try {
port = Integer.parseInt(hostAndPort[1]);
} catch (final NumberFormatException nfe) {
}
envHostsAndPorts.add(new HostAndPort(host, port));
}
}
return envHostsAndPorts;
}
}
return existingHostsAndPorts;
}
public static List<HostAndPort> getRedisServers() {
return redisHostAndPortList;
}
public static List<HostAndPort> getSentinelServers() {
return sentinelHostAndPortList;
return redisHostAndPortList;
}
public static List<HostAndPort> getSentinelServers() {
return sentinelHostAndPortList;
}
public static List<HostAndPort> getClusterServers() {
return clusterHostAndPortList;
}
}

View File

@@ -0,0 +1,196 @@
package redis.clients.jedis.tests;
import java.util.HashSet;
import java.util.Set;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.util.JedisClusterCRC16;
public class JedisClusterTest extends Assert {
private Jedis node1;
private Jedis node2;
private Jedis node3;
private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0);
private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1);
private HostAndPort nodeInfo3 = HostAndPortUtil.getClusterServers().get(2);
@Before
public void setUp() throws InterruptedException {
node1 = new Jedis(nodeInfo1.getHost(), nodeInfo1.getPort());
node1.connect();
node1.flushAll();
node2 = new Jedis(nodeInfo2.getHost(), nodeInfo2.getPort());
node2.connect();
node2.flushAll();
node3 = new Jedis(nodeInfo3.getHost(), nodeInfo3.getPort());
node3.connect();
node3.flushAll();
// ---- configure cluster
// add nodes to cluster
node1.clusterMeet("127.0.0.1", nodeInfo1.getPort());
node1.clusterMeet("127.0.0.1", nodeInfo2.getPort());
node1.clusterMeet("127.0.0.1", nodeInfo3.getPort());
// split available slots across the three nodes
int slotsPerNode = JedisCluster.HASHSLOTS / 3;
Pipeline pipeline1 = node1.pipelined();
Pipeline pipeline2 = node2.pipelined();
Pipeline pipeline3 = node3.pipelined();
for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
if (i < slotsPerNode) {
pipeline1.clusterAddSlots(i);
} else if (i > slotsPerNode * 2) {
pipeline3.clusterAddSlots(i);
} else {
pipeline2.clusterAddSlots(i);
}
}
pipeline1.sync();
pipeline2.sync();
pipeline3.sync();
waitForClusterReady();
}
@After
public void tearDown() {
// clear all slots
int[] slotsToDelete = new int[JedisCluster.HASHSLOTS];
for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
slotsToDelete[i] = i;
}
node1.clusterDelSlots(slotsToDelete);
node2.clusterDelSlots(slotsToDelete);
node3.clusterDelSlots(slotsToDelete);
}
@Test(expected=JedisMovedDataException.class)
public void testThrowMovedException() {
node1.set("foo", "bar");
}
@Test
public void testMovedExceptionParameters() {
try {
node1.set("foo", "bar");
} catch (JedisMovedDataException jme) {
assertEquals(12182, jme.getSlot());
assertEquals(new HostAndPort("127.0.0.1", 7381), jme.getTargetNode());
}
fail();
}
@Test(expected=JedisAskDataException.class)
public void testThrowAskException() {
int keySlot = JedisClusterCRC16.getSlot("test");
String node3Id = getNodeId(node3.clusterNodes());
node2.clusterSetSlotMigrating(keySlot, node3Id);
node2.get("test");
}
@Test
public void testDiscoverNodesAutomatically() {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
assertEquals(jc.getClusterNodes().size(), 3);
}
@Test
public void testCalculateConnectionPerSlot() {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
jc.set("foo", "bar");
jc.set("test", "test");
assertEquals("bar",node3.get("foo"));
assertEquals("test",node2.get("test"));
}
@Test
public void testRecalculateSlotsWhenMoved() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
int slot51 = JedisClusterCRC16.getSlot("51");
node2.clusterDelSlots(slot51);
node3.clusterDelSlots(slot51);
node3.clusterAddSlots(slot51);
waitForClusterReady();
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
@Test
public void testAskResponse() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
int slot51 = JedisClusterCRC16.getSlot("51");
node3.clusterSetSlotImporting(slot51, getNodeId(node2.clusterNodes()));
node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
@Test(expected=JedisClusterException.class)
public void testThrowExceptionWithoutKey() {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
jc.ping();
}
@Test(expected=JedisClusterMaxRedirectionsException.class)
public void testRedisClusterMaxRedirections() {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
int slot51 = JedisClusterCRC16.getSlot("51");
//This will cause an infinite redirection loop
node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
}
private String getNodeId(String infoOutput) {
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains("myself")) {
return infoLine.split(" ")[0];
}
}
return "";
}
private void waitForClusterReady() throws InterruptedException {
boolean clusterOk = false;
while (!clusterOk) {
if (node1.clusterInfo().split("\n")[0].contains("ok") &&
node2.clusterInfo().split("\n")[0].contains("ok") &&
node3.clusterInfo().split("\n")[0].contains("ok") ) {
clusterOk = true;
}
Thread.sleep(50);
}
}
}

View File

@@ -0,0 +1,138 @@
package redis.clients.jedis.tests.commands;
import java.util.List;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.tests.HostAndPortUtil;
import redis.clients.jedis.tests.JedisTestBase;
public class ClusterCommandsTest extends JedisTestBase {
private static Jedis node1;
private static Jedis node2;
private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0);
private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1);
@Before
public void setUp() throws Exception {
node1 = new Jedis(nodeInfo1.getHost(), nodeInfo1.getPort());
node1.connect();
node1.flushAll();
node2 = new Jedis(nodeInfo2.getHost(), nodeInfo2.getPort());
node2.connect();
node2.flushAll();
}
@After
public void tearDown() {
node1.disconnect();
node2.disconnect();
}
@AfterClass
public static void removeSlots() throws InterruptedException {
//This is to wait for gossip to replicate data.
waitForEqualClusterSize();
String[] nodes = node1.clusterNodes().split("\n");
String node1Id = nodes[0].split(" ")[0];
node1.clusterDelSlots(1,2,3,4,5,500);
node1.clusterSetSlotNode(5000, node1Id);
node1.clusterDelSlots(5000, 10000);
node1.clusterDelSlots(6000);
node2.clusterDelSlots(6000,1,2,3,4,5,500,5000);
try {
node2.clusterDelSlots(10000);
} catch (JedisDataException jde) {
//Do nothing, slot may or may not be assigned depending on gossip
}
}
private static void waitForEqualClusterSize() throws InterruptedException {
boolean notEqualSize = true;
while (notEqualSize) {
notEqualSize = getClusterAttribute(node1.clusterInfo(), "cluster_known_nodes") == getClusterAttribute(node2.clusterInfo(), "cluster_size") ? false : true;
}
}
private static int getClusterAttribute(String clusterInfo, String attributeName) {
for (String infoElement: clusterInfo.split("\n")) {
if (infoElement.contains(attributeName)) {
return Integer.valueOf(infoElement.split(":")[1].trim());
}
}
return 0;
}
@Test
public void clusterNodes() {
String nodes = node1.clusterNodes();
assertTrue(nodes.split("\n").length > 0);
}
@Test
public void clusterMeet() {
String status = node1.clusterMeet("127.0.0.1", nodeInfo2.getPort());
assertEquals("OK", status);
}
@Test
public void clusterAddSlots() {
String status = node1.clusterAddSlots(1, 2, 3, 4, 5);
assertEquals("OK", status);
}
@Test
public void clusterDelSlots() {
node1.clusterAddSlots(900);
String status = node1.clusterDelSlots(900);
assertEquals("OK", status);
}
@Test
public void clusterInfo() {
String info = node1.clusterInfo();
assertNotNull(info);
}
@Test
public void clusterGetKeysInSlot() {
node1.clusterAddSlots(500);
List<String> keys = node1.clusterGetKeysInSlot(500, 1);
assertEquals(0, keys.size());
}
@Test
public void clusterSetSlotNode() {
String[] nodes = node1.clusterNodes().split("\n");
String nodeId = nodes[0].split(" ")[0];
String status = node1.clusterSetSlotNode(10000, nodeId);
assertEquals("OK", status);
}
@Test
public void clusterSetSlotMigrating() {
node1.clusterAddSlots(5000);
String[] nodes = node1.clusterNodes().split("\n");
String nodeId = nodes[0].split(" ")[0];
String status = node1.clusterSetSlotMigrating(5000, nodeId);
assertEquals("OK", status);
}
@Test
public void clusterSetSlotImporting() {
node2.clusterAddSlots(6000);
String[] nodes = node1.clusterNodes().split("\n");
String nodeId = nodes[0].split(" ")[0];
String status = node1.clusterSetSlotImporting(6000, nodeId);
assertEquals("OK", status);
}
}