Add slot based connection which routes requests according to key slot

This commit is contained in:
Marcos Nils
2013-12-28 00:59:35 -03:00
parent 9f767a0848
commit b2d22e2060
6 changed files with 186 additions and 136 deletions

View File

@@ -1,7 +1,5 @@
package redis.clients.jedis;
import java.util.HashSet;
import java.util.IllegalFormatException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -17,7 +15,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
private JedisClusterConnectionHandler connectionHandler;
public JedisCluster(Set<HostAndPort> nodes, int timeout) {
connectionHandler = new JedisRandomConnectionHandler(nodes);
connectionHandler = new JedisSlotBasedConnectionHandler(nodes);
}
@@ -31,7 +29,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().set(key, value);
return connectionHandler.getConnection(key).set(key, value);
}
}.run();
}
@@ -41,7 +39,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().get(key);
return connectionHandler.getConnection(key).get(key);
}
}.run();
}
@@ -51,7 +49,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Boolean>(connectionHandler) {
@Override
public Boolean execute() {
return connectionHandler.getConnection().exists(key);
return connectionHandler.getConnection(key).exists(key);
}
}.run();
}
@@ -61,7 +59,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().persist(key);
return connectionHandler.getConnection(key).persist(key);
}
}.run();
}
@@ -71,7 +69,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().type(key);
return connectionHandler.getConnection(key).type(key);
}
}.run();
}
@@ -81,7 +79,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().expire(key, seconds);
return connectionHandler.getConnection(key).expire(key, seconds);
}
}.run();
}
@@ -91,7 +89,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().expireAt(key, unixTime);
return connectionHandler.getConnection(key).expireAt(key, unixTime);
}
}.run();
}
@@ -101,7 +99,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().ttl(key);
return connectionHandler.getConnection(key).ttl(key);
}
}.run();
}
@@ -111,7 +109,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Boolean>(connectionHandler) {
@Override
public Boolean execute() {
return connectionHandler.getConnection().setbit(key, offset, value);
return connectionHandler.getConnection(key).setbit(key, offset, value);
}
}.run();
}
@@ -121,7 +119,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Boolean>(connectionHandler) {
@Override
public Boolean execute() {
return connectionHandler.getConnection().setbit(key, offset, value);
return connectionHandler.getConnection(key).setbit(key, offset, value);
}
}.run();
}
@@ -131,7 +129,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Boolean>(connectionHandler) {
@Override
public Boolean execute() {
return connectionHandler.getConnection().getbit(key, offset);
return connectionHandler.getConnection(key).getbit(key, offset);
}
}.run();
}
@@ -141,7 +139,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().setrange(key, offset, value);
return connectionHandler.getConnection(key).setrange(key, offset, value);
}
}.run();
}
@@ -151,7 +149,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().getrange(key, startOffset, endOffset);
return connectionHandler.getConnection(key).getrange(key, startOffset, endOffset);
}
}.run();
}
@@ -161,7 +159,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().getSet(key, value);
return connectionHandler.getConnection(key).getSet(key, value);
}
}.run();
}
@@ -171,7 +169,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().setnx(key, value);
return connectionHandler.getConnection(key).setnx(key, value);
}
}.run();
}
@@ -181,7 +179,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().setex(key, seconds, value);
return connectionHandler.getConnection(key).setex(key, seconds, value);
}
}.run();
}
@@ -191,7 +189,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().decrBy(key, integer);
return connectionHandler.getConnection(key).decrBy(key, integer);
}
}.run();
}
@@ -201,7 +199,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().decr(key);
return connectionHandler.getConnection(key).decr(key);
}
}.run();
}
@@ -211,7 +209,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().incrBy(key, integer);
return connectionHandler.getConnection(key).incrBy(key, integer);
}
}.run();
}
@@ -221,7 +219,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().incr(key);
return connectionHandler.getConnection(key).incr(key);
}
}.run();
}
@@ -231,7 +229,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().append(key, value);
return connectionHandler.getConnection(key).append(key, value);
}
}.run();
}
@@ -241,7 +239,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().substr(key, start, end);
return connectionHandler.getConnection(key).substr(key, start, end);
}
}.run();
}
@@ -251,7 +249,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().hset(key, field, value);
return connectionHandler.getConnection(key).hset(key, field, value);
}
}.run();
}
@@ -261,7 +259,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().hget(key, field);
return connectionHandler.getConnection(key).hget(key, field);
}
}.run();
}
@@ -271,7 +269,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().hsetnx(key, field, value);
return connectionHandler.getConnection(key).hsetnx(key, field, value);
}
}.run();
}
@@ -281,7 +279,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().hmset(key, hash);
return connectionHandler.getConnection(key).hmset(key, hash);
}
}.run();
}
@@ -291,7 +289,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<List<String>>(connectionHandler) {
@Override
public List<String> execute() {
return connectionHandler.getConnection().hmget(key, fields);
return connectionHandler.getConnection(key).hmget(key, fields);
}
}.run();
}
@@ -301,7 +299,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().hincrBy(key, field, value);
return connectionHandler.getConnection(key).hincrBy(key, field, value);
}
}.run();
}
@@ -311,7 +309,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Boolean>(connectionHandler) {
@Override
public Boolean execute() {
return connectionHandler.getConnection().hexists(key, field);
return connectionHandler.getConnection(key).hexists(key, field);
}
}.run();
}
@@ -321,7 +319,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().hdel(key, field);
return connectionHandler.getConnection(key).hdel(key, field);
}
}.run();
}
@@ -331,7 +329,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().hdel(key);
return connectionHandler.getConnection(key).hdel(key);
}
}.run();
}
@@ -341,7 +339,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().hkeys(key);
return connectionHandler.getConnection(key).hkeys(key);
}
}.run();
}
@@ -351,7 +349,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<List<String>>(connectionHandler) {
@Override
public List<String> execute() {
return connectionHandler.getConnection().hvals(key);
return connectionHandler.getConnection(key).hvals(key);
}
}.run();
}
@@ -361,7 +359,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Map<String, String>>(connectionHandler) {
@Override
public Map<String, String> execute() {
return connectionHandler.getConnection().hgetAll(key);
return connectionHandler.getConnection(key).hgetAll(key);
}
}.run();
}
@@ -371,7 +369,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().rpush(key, string);
return connectionHandler.getConnection(key).rpush(key, string);
}
}.run();
}
@@ -381,7 +379,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().lpush(key, string);
return connectionHandler.getConnection(key).lpush(key, string);
}
}.run();
}
@@ -391,7 +389,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().llen(key);
return connectionHandler.getConnection(key).llen(key);
}
}.run();
}
@@ -401,7 +399,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<List<String>>(connectionHandler) {
@Override
public List<String> execute() {
return connectionHandler.getConnection().lrange(key, start, end);
return connectionHandler.getConnection(key).lrange(key, start, end);
}
}.run();
}
@@ -411,7 +409,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().ltrim(key, start, end);
return connectionHandler.getConnection(key).ltrim(key, start, end);
}
}.run();
}
@@ -421,7 +419,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().lindex(key, index);
return connectionHandler.getConnection(key).lindex(key, index);
}
}.run();
}
@@ -431,7 +429,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().lset(key, index, value);
return connectionHandler.getConnection(key).lset(key, index, value);
}
}.run();
}
@@ -441,7 +439,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().lrem(key, count, value);
return connectionHandler.getConnection(key).lrem(key, count, value);
}
}.run();
}
@@ -451,7 +449,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().lpop(key);
return connectionHandler.getConnection(key).lpop(key);
}
}.run();
}
@@ -461,7 +459,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().rpop(key);
return connectionHandler.getConnection(key).rpop(key);
}
}.run();
}
@@ -471,7 +469,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().sadd(key, member);
return connectionHandler.getConnection(key).sadd(key, member);
}
}.run();
}
@@ -481,7 +479,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().smembers(key);
return connectionHandler.getConnection(key).smembers(key);
}
}.run();
}
@@ -491,7 +489,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().srem(key, member);
return connectionHandler.getConnection(key).srem(key, member);
}
}.run();
}
@@ -501,7 +499,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().spop(key);
return connectionHandler.getConnection(key).spop(key);
}
}.run();
}
@@ -511,7 +509,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().scard(key);
return connectionHandler.getConnection(key).scard(key);
}
}.run();
}
@@ -521,7 +519,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Boolean>(connectionHandler) {
@Override
public Boolean execute() {
return connectionHandler.getConnection().sismember(key, member);
return connectionHandler.getConnection(key).sismember(key, member);
}
}.run();
}
@@ -531,7 +529,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().srandmember(key);
return connectionHandler.getConnection(key).srandmember(key);
}
}.run();
}
@@ -541,7 +539,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().strlen(key);
return connectionHandler.getConnection(key).strlen(key);
}
}.run();
}
@@ -551,7 +549,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zadd(key, score, member);
return connectionHandler.getConnection(key).zadd(key, score, member);
}
}.run();
}
@@ -561,7 +559,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zadd(key, scoreMembers);
return connectionHandler.getConnection(key).zadd(key, scoreMembers);
}
}.run();
}
@@ -571,7 +569,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrange(key, start, end);
return connectionHandler.getConnection(key).zrange(key, start, end);
}
}.run();
}
@@ -581,7 +579,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zrem(key, member);
return connectionHandler.getConnection(key).zrem(key, member);
}
}.run();
}
@@ -591,7 +589,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Double>(connectionHandler) {
@Override
public Double execute() {
return connectionHandler.getConnection().zincrby(key, score, member);
return connectionHandler.getConnection(key).zincrby(key, score, member);
}
}.run();
}
@@ -601,7 +599,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zrank(key, member);
return connectionHandler.getConnection(key).zrank(key, member);
}
}.run();
}
@@ -611,7 +609,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zrevrank(key, member);
return connectionHandler.getConnection(key).zrevrank(key, member);
}
}.run();
}
@@ -621,7 +619,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrevrange(key, start, end);
return connectionHandler.getConnection(key).zrevrange(key, start, end);
}
}.run();
}
@@ -631,7 +629,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrangeWithScores(key, start, end);
return connectionHandler.getConnection(key).zrangeWithScores(key, start, end);
}
}.run();
}
@@ -641,7 +639,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrevrangeWithScores(key, start, end);
return connectionHandler.getConnection(key).zrevrangeWithScores(key, start, end);
}
}.run();
}
@@ -651,7 +649,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zcard(key);
return connectionHandler.getConnection(key).zcard(key);
}
}.run();
}
@@ -661,7 +659,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Double>(connectionHandler) {
@Override
public Double execute() {
return connectionHandler.getConnection().zscore(key, member);
return connectionHandler.getConnection(key).zscore(key, member);
}
}.run();
}
@@ -671,7 +669,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<List<String>>(connectionHandler) {
@Override
public List<String> execute() {
return connectionHandler.getConnection().sort(key);
return connectionHandler.getConnection(key).sort(key);
}
}.run();
}
@@ -681,7 +679,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<List<String>>(connectionHandler) {
@Override
public List<String> execute() {
return connectionHandler.getConnection().sort(key, sortingParameters);
return connectionHandler.getConnection(key).sort(key, sortingParameters);
}
}.run();
}
@@ -691,7 +689,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zcount(key, min, max);
return connectionHandler.getConnection(key).zcount(key, min, max);
}
}.run();
}
@@ -701,7 +699,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zcount(key, min, max);
return connectionHandler.getConnection(key).zcount(key, min, max);
}
}.run();
}
@@ -711,7 +709,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrangeByScore(key, min, max);
return connectionHandler.getConnection(key).zrangeByScore(key, min, max);
}
}.run();
}
@@ -721,7 +719,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrangeByScore(key, min, max);
return connectionHandler.getConnection(key).zrangeByScore(key, min, max);
}
}.run();
}
@@ -731,7 +729,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrevrangeByScore(key, min, max);
return connectionHandler.getConnection(key).zrevrangeByScore(key, min, max);
}
}.run();
}
@@ -742,7 +740,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrangeByScore(key, min, max, offset, count);
return connectionHandler.getConnection(key).zrangeByScore(key, min, max, offset, count);
}
}.run();
}
@@ -752,7 +750,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrevrangeByScore(key, min, max);
return connectionHandler.getConnection(key).zrevrangeByScore(key, min, max);
}
}.run();
}
@@ -763,7 +761,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrangeByScore(key, min, max, offset, count);
return connectionHandler.getConnection(key).zrangeByScore(key, min, max, offset, count);
}
}.run();
}
@@ -774,7 +772,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrevrangeByScore(key, min, max, offset, count);
return connectionHandler.getConnection(key).zrevrangeByScore(key, min, max, offset, count);
}
}.run();
}
@@ -784,7 +782,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrangeByScoreWithScores(key, min, max);
return connectionHandler.getConnection(key).zrangeByScoreWithScores(key, min, max);
}
}.run();
}
@@ -794,7 +792,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrevrangeByScoreWithScores(key, min, max);
return connectionHandler.getConnection(key).zrevrangeByScoreWithScores(key, min, max);
}
}.run();
}
@@ -804,7 +802,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrangeByScoreWithScores(key, min, max, offset, count);
return connectionHandler.getConnection(key).zrangeByScoreWithScores(key, min, max, offset, count);
}
}.run();
}
@@ -815,7 +813,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<String>>(connectionHandler) {
@Override
public Set<String> execute() {
return connectionHandler.getConnection().zrevrangeByScore(key, min, max, offset, count);
return connectionHandler.getConnection(key).zrevrangeByScore(key, min, max, offset, count);
}
}.run();
}
@@ -825,7 +823,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrangeByScoreWithScores(key, min, max);
return connectionHandler.getConnection(key).zrangeByScoreWithScores(key, min, max);
}
}.run();
}
@@ -836,7 +834,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrevrangeByScoreWithScores(key, min, max);
return connectionHandler.getConnection(key).zrevrangeByScoreWithScores(key, min, max);
}
}.run();
}
@@ -847,7 +845,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrangeByScoreWithScores(key, min, max, offset, count);
return connectionHandler.getConnection(key).zrangeByScoreWithScores(key, min, max, offset, count);
}
}.run();
}
@@ -858,7 +856,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrevrangeByScoreWithScores(key, max, min, offset, count);
return connectionHandler.getConnection(key).zrevrangeByScoreWithScores(key, max, min, offset, count);
}
}.run();
}
@@ -869,7 +867,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Set<Tuple>>(connectionHandler) {
@Override
public Set<Tuple> execute() {
return connectionHandler.getConnection().zrevrangeByScoreWithScores(key, max, min, offset, count);
return connectionHandler.getConnection(key).zrevrangeByScoreWithScores(key, max, min, offset, count);
}
}.run();
}
@@ -879,7 +877,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zremrangeByRank(key, start, end);
return connectionHandler.getConnection(key).zremrangeByRank(key, start, end);
}
}.run();
}
@@ -889,7 +887,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zremrangeByScore(key, start, end);
return connectionHandler.getConnection(key).zremrangeByScore(key, start, end);
}
}.run();
}
@@ -899,7 +897,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().zremrangeByScore(key, start, end);
return connectionHandler.getConnection(key).zremrangeByScore(key, start, end);
}
}.run();
}
@@ -910,7 +908,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().linsert(key, where, pivot, value);
return connectionHandler.getConnection(key).linsert(key, where, pivot, value);
}
}.run();
}
@@ -920,7 +918,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().lpushx(key, string);
return connectionHandler.getConnection(key).lpushx(key, string);
}
}.run();
}
@@ -930,7 +928,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().rpushx(key, string);
return connectionHandler.getConnection(key).rpushx(key, string);
}
}.run();
}
@@ -940,7 +938,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<List<String>>(connectionHandler) {
@Override
public List<String> execute() {
return connectionHandler.getConnection().blpop(arg);
return connectionHandler.getConnection(null).blpop(arg);
}
}.run();
}
@@ -950,7 +948,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<List<String>>(connectionHandler) {
@Override
public List<String> execute() {
return connectionHandler.getConnection().brpop(arg);
return connectionHandler.getConnection(null).brpop(arg);
}
}.run();
}
@@ -960,7 +958,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().del(key);
return connectionHandler.getConnection(key).del(key);
}
}.run();
}
@@ -970,7 +968,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().echo(string);
return connectionHandler.getConnection(null).echo(string);
}
}.run();
}
@@ -980,7 +978,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().move(key, dbIndex);
return connectionHandler.getConnection(key).move(key, dbIndex);
}
}.run();
}
@@ -990,7 +988,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().bitcount(key);
return connectionHandler.getConnection(key).bitcount(key);
}
}.run();
}
@@ -1000,7 +998,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().bitcount(key, start, end);
return connectionHandler.getConnection(key).bitcount(key, start, end);
}
}.run();
}
@@ -1010,7 +1008,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().ping();
return connectionHandler.getConnection(null).ping();
}
}.run();
}
@@ -1020,7 +1018,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().quit();
return connectionHandler.getConnection(null).quit();
}
}.run();
}
@@ -1030,7 +1028,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().flushDB();
return connectionHandler.getConnection(null).flushDB();
}
}.run();
}
@@ -1040,7 +1038,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().dbSize();
return connectionHandler.getConnection(null).dbSize();
}
}.run();
}
@@ -1050,7 +1048,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().select(index);
return connectionHandler.getConnection(null).select(index);
}
}.run();
}
@@ -1060,7 +1058,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().flushAll();
return connectionHandler.getConnection(null).flushAll();
}
}.run();
}
@@ -1070,7 +1068,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().auth(password);
return connectionHandler.getConnection(null).auth(password);
}
}.run();
}
@@ -1080,7 +1078,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().save();
return connectionHandler.getConnection(null).save();
}
}.run();
}
@@ -1090,7 +1088,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().bgsave();
return connectionHandler.getConnection(null).bgsave();
}
}.run();
}
@@ -1100,7 +1098,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().bgrewriteaof();
return connectionHandler.getConnection(null).bgrewriteaof();
}
}.run();
}
@@ -1110,7 +1108,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().lastsave();
return connectionHandler.getConnection(null).lastsave();
}
}.run();
}
@@ -1120,7 +1118,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().shutdown();
return connectionHandler.getConnection(null).shutdown();
}
}.run();
}
@@ -1130,7 +1128,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().info();
return connectionHandler.getConnection(null).info();
}
}.run();
}
@@ -1140,7 +1138,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().info(section);
return connectionHandler.getConnection(null).info(section);
}
}.run();
}
@@ -1150,7 +1148,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().slaveof(host, port);
return connectionHandler.getConnection(null).slaveof(host, port);
}
}.run();
}
@@ -1160,7 +1158,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().slaveofNoOne();
return connectionHandler.getConnection(null).slaveofNoOne();
}
}.run();
}
@@ -1170,7 +1168,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<Long>(connectionHandler) {
@Override
public Long execute() {
return connectionHandler.getConnection().getDB();
return connectionHandler.getConnection(null).getDB();
}
}.run();
}
@@ -1180,7 +1178,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().debug(params);
return connectionHandler.getConnection(null).debug(params);
}
}.run();
}
@@ -1190,7 +1188,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return new JedisClusterCommand<String>(connectionHandler) {
@Override
public String execute() {
return connectionHandler.getConnection().configResetStat();
return connectionHandler.getConnection(null).configResetStat();
}
}.run();
}

View File

@@ -8,8 +8,9 @@ import java.util.Set;
public abstract class JedisClusterConnectionHandler {
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
abstract Jedis getConnection();
abstract Jedis getConnection(String key);
public JedisClusterConnectionHandler(Set<HostAndPort> nodes) {
initializeSlotsCache(nodes);
@@ -23,20 +24,39 @@ public abstract class JedisClusterConnectionHandler {
for (HostAndPort hostAndPort : nodes) {
JedisPool jp = new JedisPool(hostAndPort.getHost(), hostAndPort.getPort());
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
this.nodes.putAll(discoverClusterNodes(jp));
discoverClusterNodesAndSlots(jp);
}
}
private Map<? extends String, ? extends JedisPool> discoverClusterNodes(JedisPool jp) {
Map<String, JedisPool> discoveredNodes = new HashMap<String, JedisPool>();
private void discoverClusterNodesAndSlots(JedisPool jp) {
String localNodes = jp.getResource().clusterNodes();
for (String nodeInfo : localNodes.split("\n")) {
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo);
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
discoveredNodes.put(node.getHost() + node.getPort(), nodePool);
this.nodes.put(node.getHost() + node.getPort(), nodePool);
populateNodeSlots(nodeInfo, nodePool);
}
}
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
String[] nodeInfoArray = nodeInfo.split(" ");
if (nodeInfoArray.length > 7) {
for (int i = 8; i < nodeInfoArray.length; i++) {
processSlot(nodeInfoArray[i], nodePool);
}
}
}
private void processSlot(String slot, JedisPool nodePool) {
if (slot.contains("-")) {
String[] slotRange = slot.split("-");
for (int i = Integer.valueOf(slotRange[0]); i <= Integer.valueOf(slotRange[1]); i++) {
slots.put(i, nodePool);
}
} else {
slots.put(Integer.valueOf(slot), nodePool);
}
return discoveredNodes;
}
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo) {

View File

@@ -14,7 +14,7 @@ public class JedisRandomConnectionHandler extends JedisClusterConnectionHandler
@SuppressWarnings("unchecked")
public Jedis getConnection() {
public Jedis getConnection(String key) {
Object[] nodeArray = nodes.values().toArray();
return ((Pool<Jedis>) nodeArray[new Random().nextInt(nodeArray.length)]).getResource();
}

View File

@@ -0,0 +1,30 @@
package redis.clients.jedis;
import java.util.Random;
import java.util.Set;
import redis.clients.jedis.tests.utils.JedisClusterCRC16;
public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes) {
super(nodes);
}
public Jedis getConnection(String key) {
int keySlot = JedisClusterCRC16.getSlot(key);
JedisPool connectionPool = slots.get(keySlot);
if (connectionPool == null) {
connectionPool = getRandomConnection();
}
return connectionPool.getResource();
}
private JedisPool getRandomConnection() {
Object[] nodeArray = nodes.values().toArray();
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
}
}

View File

@@ -89,7 +89,6 @@ public class JedisClusterTest extends Assert {
@Test(expected=JedisMovedDataException.class)
public void testThrowMovedException() {
node1.set("foo", "bar");
node2.get("foo");
}
@Test(expected=JedisAskDataException.class)
@@ -108,6 +107,17 @@ public class JedisClusterTest extends Assert {
assertEquals(jc.getClusterNodes().size(), 3);
}
@Test
public void testCalculateConnectionPerSlot() {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
jc.set("foo", "bar");
jc.set("test", "test");
assertEquals("bar",node3.get("foo"));
assertEquals("test",node2.get("test"));
}
private String getNodeId(String infoOutput) {
for (String infoLine : infoOutput.split("\n")) {

View File

@@ -4,11 +4,8 @@ public class JedisClusterCRC16 {
public final static int polynomial = 0x1021; // Represents x^16+x^12+x^5+1
static int crc;
public JedisClusterCRC16(){
crc = 0x0000;
}
public static int getSlot(String key) {
crc = 0x0000;
for (byte b : key.getBytes()) {
for (int i = 0; i < 8; i++) {
boolean bit = ((b >> (7-i) & 1) == 1);
@@ -21,9 +18,4 @@ public class JedisClusterCRC16 {
return crc &= 0xffff % 16384;
}
public static void main(String[] args) {
System.out.println(getSlot("test"));
}
}