Several changes have been added to this commit:

- Add asking to cluster commands
- Make jedis cluster return connection to original pool
- Add tests for MOVED and ASK cluster responses
- Refactor connection handler to recalculate connections based on slots

This commit makes the first usable version of Jedis along with Redis Cluster
This commit is contained in:
Marcos Nils
2014-01-03 16:42:21 -03:00
parent 1b26815799
commit dd0bbdaf91
10 changed files with 349 additions and 265 deletions

View File

@@ -1104,5 +1104,8 @@ public class BinaryClient extends Connection {
public void cluster(final byte[]... args) {
sendCommand(CLUSTER, args);
}
public void asking() {
sendCommand(Command.ASKING);
}
}

View File

@@ -3130,4 +3130,10 @@ public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommand
client.clusterSetSlotImporting(slot, nodeId);
return client.getStatusCodeReply();
}
public String asking() {
checkIsInMulti();
client.asking();
return client.getStatusCodeReply();
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,27 +1,41 @@
package redis.clients.jedis;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.tests.utils.JedisClusterCRC16;
public abstract class JedisClusterCommand<T> {
private JedisClusterConnectionHandler connectionHandler;
private boolean asking = false;
private JedisClusterConnectionHandler connectionHandler;
// private boolean asking = false;
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler) {
this.connectionHandler = connectionHandler;
}
public abstract T execute();
public T run() {
public T run(String key) {
try {
if (key == null) {
throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
}
connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
if (asking) {
//TODO: Pipeline asking with the original command to make it faster....
connectionHandler.getConnection().asking();
}
return execute();
} catch (JedisMovedDataException jme) {
this.connectionHandler.assignSlotToNode(jme.getSlot(), jme.getTargetNode());
return execute();
return run(key);
} catch (JedisAskDataException jae) {
throw jae;
asking = true;
this.connectionHandler.assignSlotToNode(jae.getSlot(), jae.getTargetNode());
return run(key);
}
}
}

View File

@@ -2,6 +2,7 @@ package redis.clients.jedis;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -10,7 +11,8 @@ public abstract class JedisClusterConnectionHandler {
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
abstract Jedis getConnection(String key);
abstract Jedis getConnection();
abstract Jedis getConnectionFromSlot(int slot);
public JedisClusterConnectionHandler(Set<HostAndPort> nodes) {
initializeSlotsCache(nodes);
@@ -69,4 +71,12 @@ public abstract class JedisClusterConnectionHandler {
JedisPool targetPool = nodes.get(targetNode.getHost() + targetNode.getPort());
slots.put(slot, targetPool);
}
protected JedisPool getRandomConnection() {
Object[] nodeArray = nodes.values().toArray();
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
}
}

View File

@@ -1,10 +1,7 @@
package redis.clients.jedis;
import java.util.Random;
import java.util.Set;
import redis.clients.util.Pool;
public class JedisRandomConnectionHandler extends JedisClusterConnectionHandler {
@@ -12,11 +9,12 @@ public class JedisRandomConnectionHandler extends JedisClusterConnectionHandler
super(nodes);
}
@SuppressWarnings("unchecked")
public Jedis getConnection(String key) {
Object[] nodeArray = nodes.values().toArray();
return ((Pool<Jedis>) nodeArray[new Random().nextInt(nodeArray.length)]).getResource();
public Jedis getConnection() {
return getRandomConnection().getResource();
}
@Override
Jedis getConnectionFromSlot(int slot) {
return getRandomConnection().getResource();
}
}

View File

@@ -1,30 +1,48 @@
package redis.clients.jedis;
import java.util.Random;
import java.util.Set;
import redis.clients.jedis.tests.utils.JedisClusterCRC16;
public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
private Jedis currentConnection;
public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes) {
super(nodes);
}
public Jedis getConnection(String key) {
int keySlot = JedisClusterCRC16.getSlot(key);
JedisPool connectionPool = slots.get(keySlot);
public Jedis getConnection() {
return currentConnection != null ? currentConnection : getRandomConnection().getResource();
}
private void returnCurrentConnection() {
if (currentConnection != null) {
nodes.get(currentConnection.getClient().getHost()+currentConnection.getClient().getPort()).returnResource(currentConnection);
}
}
@Override
public void assignSlotToNode(int slot, HostAndPort targetNode) {
super.assignSlotToNode(slot, targetNode);
getConnectionFromSlot(slot);
}
@Override
public Jedis getConnectionFromSlot(int slot) {
returnCurrentConnection();
JedisPool connectionPool = slots.get(slot);
if (connectionPool == null) {
connectionPool = getRandomConnection();
}
currentConnection = connectionPool.getResource();
return connectionPool.getResource();
}
private JedisPool getRandomConnection() {
Object[] nodeArray = nodes.values().toArray();
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
}
}

View File

@@ -79,15 +79,26 @@ public final class Protocol {
String message = is.readLine();
//TODO: I'm not sure if this is the best way to do this.
//Maybe Read only first 5 bytes instead?
//
if (message.contains(MOVED_RESPONSE)) {
String[] movedInfo = message.split(" ");
String[] targetHostAndPort = movedInfo[2].split(":");
throw new JedisMovedDataException(message, new HostAndPort(targetHostAndPort[0], Integer.valueOf(targetHostAndPort[1])), Integer.valueOf(movedInfo[1]));
String[] movedInfo = parseTargetHostAndSlot(message);
throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1], Integer.valueOf(movedInfo[2])), Integer.valueOf(movedInfo[0]));
} else if (message.contains(ASK_RESPONSE)) {
throw new JedisAskDataException(message);
String[] askInfo = parseTargetHostAndSlot(message);
throw new JedisAskDataException(message, new HostAndPort(askInfo[1], Integer.valueOf(askInfo[2])), Integer.valueOf(askInfo[0]));
}
throw new JedisDataException(message);
}
private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) {
String[] response = new String[3];
String[] messageInfo = clusterRedirectResponse.split(" ");
String[] targetHostAndPort = messageInfo[2].split(":");
response[0] = messageInfo[1];
response[1] = targetHostAndPort[0];
response[2] = targetHostAndPort[1];
return response;
}
private static Object process(final RedisInputStream is) {
try {
@@ -179,7 +190,7 @@ public final class Protocol {
public static enum Command {
PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL,
DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, CLUSTER;
DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, CLUSTER, ASKING;
public final byte[] raw;

View File

@@ -1,11 +1,12 @@
package redis.clients.jedis.exceptions;
import redis.clients.jedis.HostAndPort;
public class JedisAskDataException extends JedisDataException {
private static final long serialVersionUID = 3878126572474819403L;
public JedisAskDataException(String message) {
super(message);
}
private HostAndPort targetNode;
private int slot;
public JedisAskDataException(Throwable cause) {
super(cause);
@@ -14,4 +15,18 @@ public class JedisAskDataException extends JedisDataException {
public JedisAskDataException(String message, Throwable cause) {
super(message, cause);
}
public JedisAskDataException(String message, HostAndPort targetHost, int slot) {
super(message);
this.targetNode = targetHost;
this.slot = slot;
}
public HostAndPort getTargetNode() {
return targetNode;
}
public int getSlot() {
return slot;
}
}

View File

@@ -13,6 +13,7 @@ import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.tests.utils.JedisClusterCRC16;
@@ -39,7 +40,7 @@ public class JedisClusterTest extends Assert {
node3.connect();
node3.flushAll();
// ---- configure cluster
// ---- configure cluster
// add nodes to cluster
node1.clusterMeet("127.0.0.1", nodeInfo1.getPort());
@@ -148,6 +149,14 @@ public class JedisClusterTest extends Assert {
assertEquals("foo", jc.get("51"));
}
@Test(expected=JedisClusterException.class)
public void testThrowExceptionWithoutKey() {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
jc.ping();
}
private String getNodeId(String infoOutput) {
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains("myself")) {