Merge branch 'master' into implements-closeable-to-pooled-jedis-shardedjedis
Conflicts: src/main/java/redis/clients/jedis/Jedis.java src/main/java/redis/clients/jedis/JedisSentinelPool.java src/main/java/redis/clients/jedis/ShardedJedis.java src/test/java/redis/clients/jedis/tests/JedisPoolTest.java
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Pipelined responses for all of the low level, non key related commands
|
||||
*/
|
||||
@@ -24,6 +26,8 @@ public interface BasicRedisPipeline {
|
||||
Response<String> flushAll();
|
||||
|
||||
Response<String> info();
|
||||
|
||||
Response<List<String>> time();
|
||||
|
||||
Response<Long> dbSize();
|
||||
|
||||
|
||||
@@ -1,26 +1,22 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import static redis.clients.jedis.Protocol.toByteArray;
|
||||
import static redis.clients.jedis.Protocol.Command.*;
|
||||
import static redis.clients.jedis.Protocol.Keyword.ENCODING;
|
||||
import static redis.clients.jedis.Protocol.Keyword.IDLETIME;
|
||||
import static redis.clients.jedis.Protocol.Keyword.LEN;
|
||||
import static redis.clients.jedis.Protocol.Keyword.LIMIT;
|
||||
import static redis.clients.jedis.Protocol.Keyword.NO;
|
||||
import static redis.clients.jedis.Protocol.Keyword.ONE;
|
||||
import static redis.clients.jedis.Protocol.Keyword.REFCOUNT;
|
||||
import static redis.clients.jedis.Protocol.Keyword.RESET;
|
||||
import static redis.clients.jedis.Protocol.Keyword.STORE;
|
||||
import static redis.clients.jedis.Protocol.Keyword.WITHSCORES;
|
||||
import redis.clients.jedis.Protocol.Command;
|
||||
import redis.clients.jedis.Protocol.Keyword;
|
||||
import redis.clients.util.SafeEncoder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import redis.clients.jedis.Protocol.Command;
|
||||
import redis.clients.jedis.Protocol.Keyword;
|
||||
import redis.clients.util.SafeEncoder;
|
||||
import static redis.clients.jedis.Protocol.Command.*;
|
||||
import static redis.clients.jedis.Protocol.Command.EXISTS;
|
||||
import static redis.clients.jedis.Protocol.Command.PSUBSCRIBE;
|
||||
import static redis.clients.jedis.Protocol.Command.PUNSUBSCRIBE;
|
||||
import static redis.clients.jedis.Protocol.Command.SUBSCRIBE;
|
||||
import static redis.clients.jedis.Protocol.Command.UNSUBSCRIBE;
|
||||
import static redis.clients.jedis.Protocol.Keyword.*;
|
||||
import static redis.clients.jedis.Protocol.toByteArray;
|
||||
|
||||
public class BinaryClient extends Connection {
|
||||
public enum LIST_POSITION {
|
||||
@@ -203,6 +199,10 @@ public class BinaryClient extends Connection {
|
||||
sendCommand(INCRBY, key, toByteArray(integer));
|
||||
}
|
||||
|
||||
public void incrByFloat(final byte[] key, final double value) {
|
||||
sendCommand(INCRBYFLOAT, key, toByteArray(value));
|
||||
}
|
||||
|
||||
public void incr(final byte[] key) {
|
||||
sendCommand(INCR, key);
|
||||
}
|
||||
@@ -320,7 +320,7 @@ public class BinaryClient extends Connection {
|
||||
public void sadd(final byte[] key, final byte[]... members) {
|
||||
sendCommand(SADD, joinParameters(key, members));
|
||||
}
|
||||
|
||||
|
||||
public void smembers(final byte[] key) {
|
||||
sendCommand(SMEMBERS, key);
|
||||
}
|
||||
@@ -929,7 +929,15 @@ public class BinaryClient extends Connection {
|
||||
public void getbit(byte[] key, long offset) {
|
||||
sendCommand(GETBIT, key, toByteArray(offset));
|
||||
}
|
||||
|
||||
|
||||
public void bitpos(final byte[] key, final boolean value, final BitPosParams params) {
|
||||
final List<byte[]> args = new ArrayList<byte[]>();
|
||||
args.add(key);
|
||||
args.add(toByteArray(value));
|
||||
args.addAll(params.getParams());
|
||||
sendCommand(BITPOS, args.toArray(new byte[args.size()][]));
|
||||
}
|
||||
|
||||
public void setrange(byte[] key, long offset, byte[] value) {
|
||||
sendCommand(SETRANGE, key, toByteArray(offset), value);
|
||||
}
|
||||
@@ -1091,7 +1099,12 @@ public class BinaryClient extends Connection {
|
||||
sendCommand(RESTORE, key, toByteArray(ttl), serializedValue);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void pexpire(final byte[] key, final int milliseconds) {
|
||||
pexpire(key, (long) milliseconds);
|
||||
}
|
||||
|
||||
public void pexpire(final byte[] key, final long milliseconds) {
|
||||
sendCommand(PEXPIRE, key, toByteArray(milliseconds));
|
||||
}
|
||||
|
||||
@@ -1103,10 +1116,6 @@ public class BinaryClient extends Connection {
|
||||
sendCommand(PTTL, key);
|
||||
}
|
||||
|
||||
public void incrByFloat(final byte[] key, final double increment) {
|
||||
sendCommand(INCRBYFLOAT, key, toByteArray(increment));
|
||||
}
|
||||
|
||||
public void psetex(final byte[] key, final int milliseconds,
|
||||
final byte[] value) {
|
||||
sendCommand(PSETEX, key, toByteArray(milliseconds), value);
|
||||
@@ -1253,4 +1262,20 @@ public class BinaryClient extends Connection {
|
||||
public void asking() {
|
||||
sendCommand(Command.ASKING);
|
||||
}
|
||||
|
||||
public void pfadd(final byte[] key, final byte[]... elements) {
|
||||
sendCommand(PFADD, joinParameters(key, elements));
|
||||
}
|
||||
|
||||
public void pfcount(final byte[] key) {
|
||||
sendCommand(PFCOUNT, key);
|
||||
}
|
||||
|
||||
public void pfcount(final byte[]...keys) {
|
||||
sendCommand(PFCOUNT, keys);
|
||||
}
|
||||
|
||||
public void pfmerge(final byte[] destkey, final byte[]... sourcekeys) {
|
||||
sendCommand(PFMERGE, joinParameters(destkey, sourcekeys));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import static redis.clients.jedis.Protocol.toByteArray;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.net.URI;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
@@ -633,6 +634,37 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
/**
|
||||
* INCRBYFLOAT work just like {@link #incrBy(byte[]) INCRBY} but increments
|
||||
* by floats instead of integers.
|
||||
* <p>
|
||||
* INCRBYFLOAT commands are limited to double precision floating point
|
||||
* values.
|
||||
* <p>
|
||||
* Note: this is actually a string operation, that is, in Redis there are
|
||||
* not "double" types. Simply the string stored at the key is parsed as a
|
||||
* base double precision floating point value, incremented, and then
|
||||
* converted back as a string. There is no DECRYBYFLOAT but providing a
|
||||
* negative value will work as expected.
|
||||
* <p>
|
||||
* Time complexity: O(1)
|
||||
*
|
||||
* @see #incr(byte[])
|
||||
* @see #decr(byte[])
|
||||
* @see #decrBy(byte[], long)
|
||||
*
|
||||
* @param key
|
||||
* @param integer
|
||||
* @return Integer reply, this commands will reply with the new value of key
|
||||
* after the increment.
|
||||
*/
|
||||
public Double incrByFloat(final byte[] key, final double integer) {
|
||||
checkIsInMulti();
|
||||
client.incrByFloat(key, integer);
|
||||
String dval = client.getBulkReply();
|
||||
return (dval != null ? new Double(dval) : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the number stored at key by one. If the key does not exist or
|
||||
* contains a value of a wrong type, set the key to the value of "0" before
|
||||
@@ -825,6 +857,33 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the number stored at field in the hash at key by a double
|
||||
* precision floating point value. If key does not exist, a new key holding
|
||||
* a hash is created. If field does not exist or holds a string, the value
|
||||
* is set to 0 before applying the operation. Since the value argument is
|
||||
* signed you can use this command to perform both increments and
|
||||
* decrements.
|
||||
* <p>
|
||||
* The range of values supported by HINCRBYFLOAT is limited to double
|
||||
* precision floating point values.
|
||||
* <p>
|
||||
* <b>Time complexity:</b> O(1)
|
||||
*
|
||||
* @param key
|
||||
* @param field
|
||||
* @param value
|
||||
* @return Double precision floating point reply The new value at field
|
||||
* after the increment operation.
|
||||
*/
|
||||
public Double hincrByFloat(final byte[] key, final byte[] field,
|
||||
final double value) {
|
||||
checkIsInMulti();
|
||||
client.hincrByFloat(key, field, value);
|
||||
final String dval = client.getBulkReply();
|
||||
return (dval != null ? new Double(dval) : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for existence of a specified field in a hash.
|
||||
*
|
||||
@@ -3128,6 +3187,16 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
|
||||
return client.getIntegerReply() == 1;
|
||||
}
|
||||
|
||||
public Long bitpos(final byte[] key, final boolean value) {
|
||||
return bitpos(key, value, new BitPosParams());
|
||||
}
|
||||
|
||||
public Long bitpos(final byte[] key, final boolean value,
|
||||
final BitPosParams params) {
|
||||
client.bitpos(key, value, params);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
public Long setrange(byte[] key, long offset, byte[] value) {
|
||||
client.setrange(key, offset, value);
|
||||
return client.getIntegerReply();
|
||||
@@ -3316,7 +3385,12 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public Long pexpire(final byte[] key, final int milliseconds) {
|
||||
return pexpire(key, (long) milliseconds);
|
||||
}
|
||||
|
||||
public Long pexpire(final byte[] key, final long milliseconds) {
|
||||
checkIsInMulti();
|
||||
client.pexpire(key, milliseconds);
|
||||
return client.getIntegerReply();
|
||||
@@ -3334,13 +3408,6 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
public Double incrByFloat(final byte[] key, final double increment) {
|
||||
checkIsInMulti();
|
||||
client.incrByFloat(key, increment);
|
||||
String relpy = client.getBulkReply();
|
||||
return (relpy != null ? new Double(relpy) : null);
|
||||
}
|
||||
|
||||
public String psetex(final byte[] key, final int milliseconds,
|
||||
final byte[] value) {
|
||||
checkIsInMulti();
|
||||
@@ -3398,14 +3465,6 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
public Double hincrByFloat(final byte[] key, final byte[] field,
|
||||
double increment) {
|
||||
checkIsInMulti();
|
||||
client.hincrByFloat(key, field, increment);
|
||||
String relpy = client.getBulkReply();
|
||||
return (relpy != null ? new Double(relpy) : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Syncrhonous replication of Redis as described here:
|
||||
* http://antirez.com/news/66
|
||||
@@ -3419,4 +3478,99 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long pfadd(final byte[] key, final byte[]... elements) {
|
||||
checkIsInMulti();
|
||||
client.pfadd(key, elements);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long pfcount(final byte[] key) {
|
||||
checkIsInMulti();
|
||||
client.pfcount(key);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String pfmerge(final byte[] destkey, final byte[]... sourcekeys) {
|
||||
checkIsInMulti();
|
||||
client.pfmerge(destkey, sourcekeys);
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long pfcount(byte[]... keys) {
|
||||
checkIsInMulti();
|
||||
client.pfcount(keys);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
public ScanResult<byte[]> scan(final byte[] cursor) {
|
||||
return scan(cursor, new ScanParams());
|
||||
}
|
||||
|
||||
public ScanResult<byte[]> scan(final byte[] cursor, final ScanParams params) {
|
||||
checkIsInMulti();
|
||||
client.scan(cursor, params);
|
||||
List<Object> result = client.getObjectMultiBulkReply();
|
||||
byte[] newcursor = (byte[]) result.get(0);
|
||||
List<byte[]> rawResults = (List<byte[]>) result.get(1);
|
||||
return new ScanResult<byte[]>(newcursor, rawResults);
|
||||
}
|
||||
|
||||
public ScanResult<Map.Entry<byte[], byte[]>> hscan(final byte[] key,
|
||||
final byte[] cursor) {
|
||||
return hscan(key, cursor, new ScanParams());
|
||||
}
|
||||
|
||||
public ScanResult<Map.Entry<byte[], byte[]>> hscan(final byte[] key,
|
||||
final byte[] cursor, final ScanParams params) {
|
||||
checkIsInMulti();
|
||||
client.hscan(key, cursor, params);
|
||||
List<Object> result = client.getObjectMultiBulkReply();
|
||||
byte[] newcursor = (byte[]) result.get(0);
|
||||
List<Map.Entry<byte[], byte[]>> results = new ArrayList<Map.Entry<byte[], byte[]>>();
|
||||
List<byte[]> rawResults = (List<byte[]>) result.get(1);
|
||||
Iterator<byte[]> iterator = rawResults.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
results.add(new AbstractMap.SimpleEntry<byte[], byte[]>(iterator
|
||||
.next(), iterator.next()));
|
||||
}
|
||||
return new ScanResult<Map.Entry<byte[], byte[]>>(newcursor, results);
|
||||
}
|
||||
|
||||
public ScanResult<byte[]> sscan(final byte[] key, final byte[] cursor) {
|
||||
return sscan(key, cursor, new ScanParams());
|
||||
}
|
||||
|
||||
public ScanResult<byte[]> sscan(final byte[] key, final byte[] cursor,
|
||||
final ScanParams params) {
|
||||
checkIsInMulti();
|
||||
client.sscan(key, cursor, params);
|
||||
List<Object> result = client.getObjectMultiBulkReply();
|
||||
byte[] newcursor = (byte[]) result.get(0);
|
||||
List<byte[]> rawResults = (List<byte[]>) result.get(1);
|
||||
return new ScanResult<byte[]>(newcursor, rawResults);
|
||||
}
|
||||
|
||||
public ScanResult<Tuple> zscan(final byte[] key, final byte[] cursor) {
|
||||
return zscan(key, cursor, new ScanParams());
|
||||
}
|
||||
|
||||
public ScanResult<Tuple> zscan(final byte[] key, final byte[] cursor,
|
||||
final ScanParams params) {
|
||||
checkIsInMulti();
|
||||
client.zscan(key, cursor, params);
|
||||
List<Object> result = client.getObjectMultiBulkReply();
|
||||
byte[] newcursor = (byte[]) result.get(0);
|
||||
List<Tuple> results = new ArrayList<Tuple>();
|
||||
List<byte[]> rawResults = (List<byte[]>) result.get(1);
|
||||
Iterator<byte[]> iterator = rawResults.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
results.add(new Tuple(iterator.next(), Double.valueOf(SafeEncoder
|
||||
.encode(iterator.next()))));
|
||||
}
|
||||
return new ScanResult<Tuple>(newcursor, results);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,6 +47,8 @@ public interface BinaryJedisCommands {
|
||||
|
||||
Long incrBy(byte[] key, long integer);
|
||||
|
||||
Double incrByFloat(byte[] key, double value);
|
||||
|
||||
Long incr(byte[] key);
|
||||
|
||||
Long append(byte[] key, byte[] value);
|
||||
@@ -65,6 +67,8 @@ public interface BinaryJedisCommands {
|
||||
|
||||
Long hincrBy(byte[] key, byte[] field, long value);
|
||||
|
||||
Double hincrByFloat(byte[] key, byte[] field, double value);
|
||||
|
||||
Boolean hexists(byte[] key, byte[] field);
|
||||
|
||||
Long hdel(byte[] key, byte[]... field);
|
||||
@@ -211,4 +215,8 @@ public interface BinaryJedisCommands {
|
||||
Long bitcount(final byte[] key);
|
||||
|
||||
Long bitcount(final byte[] key, long start, long end);
|
||||
|
||||
Long pfadd(final byte[] key, final byte[]... elements);
|
||||
|
||||
long pfcount(final byte[] key);
|
||||
}
|
||||
|
||||
@@ -210,4 +210,8 @@ public interface BinaryRedisPipeline {
|
||||
Response<Long> bitcount(byte[] key);
|
||||
|
||||
Response<Long> bitcount(byte[] key, long start, long end);
|
||||
|
||||
Response<Long> pfadd(final byte[] key, final byte[]... elements);
|
||||
|
||||
Response<Long> pfcount(final byte[] key);
|
||||
}
|
||||
|
||||
@@ -110,6 +110,11 @@ public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo>
|
||||
return j.incrBy(key, integer);
|
||||
}
|
||||
|
||||
public Double incrByFloat(byte[] key, double integer) {
|
||||
Jedis j = getShard(key);
|
||||
return j.incrByFloat(key, integer);
|
||||
}
|
||||
|
||||
public Long incr(byte[] key) {
|
||||
Jedis j = getShard(key);
|
||||
return j.incr(key);
|
||||
@@ -155,6 +160,11 @@ public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo>
|
||||
return j.hincrBy(key, field, value);
|
||||
}
|
||||
|
||||
public Double hincrByFloat(byte[] key, byte[] field, double value) {
|
||||
Jedis j = getShard(key);
|
||||
return j.hincrByFloat(key, field, value);
|
||||
}
|
||||
|
||||
public Boolean hexists(byte[] key, byte[] field) {
|
||||
Jedis j = getShard(key);
|
||||
return j.hexists(key, field);
|
||||
@@ -569,4 +579,17 @@ public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo>
|
||||
Jedis j = getShard(key);
|
||||
return j.bitcount(key, start, end);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long pfadd(final byte[] key, final byte[]... elements) {
|
||||
Jedis j = getShard(key);
|
||||
return j.pfadd(key, elements);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long pfcount(final byte[] key) {
|
||||
Jedis j = getShard(key);
|
||||
return j.pfcount(key);
|
||||
}
|
||||
|
||||
}
|
||||
27
src/main/java/redis/clients/jedis/BitPosParams.java
Normal file
27
src/main/java/redis/clients/jedis/BitPosParams.java
Normal file
@@ -0,0 +1,27 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class BitPosParams {
|
||||
private List<byte[]> params = new ArrayList<byte[]>();
|
||||
|
||||
protected BitPosParams() {
|
||||
}
|
||||
|
||||
public BitPosParams(long start) {
|
||||
params.add(Protocol.toByteArray(start));
|
||||
}
|
||||
|
||||
public BitPosParams(long start, long end) {
|
||||
this(start);
|
||||
|
||||
params.add(Protocol.toByteArray(end));
|
||||
}
|
||||
|
||||
public Collection<byte[]> getParams() {
|
||||
return Collections.unmodifiableCollection(params);
|
||||
}
|
||||
}
|
||||
@@ -629,6 +629,9 @@ public class Client extends BinaryClient implements Commands {
|
||||
getbit(SafeEncoder.encode(key), offset);
|
||||
}
|
||||
|
||||
public void bitpos(final String key, final boolean value, final BitPosParams params) {
|
||||
bitpos(SafeEncoder.encode(key), value, params);
|
||||
}
|
||||
public void setrange(String key, long offset, String value) {
|
||||
setrange(SafeEncoder.encode(key), offset, SafeEncoder.encode(value));
|
||||
}
|
||||
@@ -777,7 +780,12 @@ public class Client extends BinaryClient implements Commands {
|
||||
restore(SafeEncoder.encode(key), ttl, serializedValue);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void pexpire(final String key, final int milliseconds) {
|
||||
pexpire(key, (long) milliseconds);
|
||||
}
|
||||
|
||||
public void pexpire(final String key, final long milliseconds) {
|
||||
pexpire(SafeEncoder.encode(key), milliseconds);
|
||||
}
|
||||
|
||||
@@ -951,4 +959,56 @@ public class Client extends BinaryClient implements Commands {
|
||||
cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot),
|
||||
Protocol.CLUSTER_SETSLOT_IMPORTING, nodeId);
|
||||
}
|
||||
|
||||
public void pfadd(String key, final String... elements) {
|
||||
pfadd(SafeEncoder.encode(key), SafeEncoder.encodeMany(elements));
|
||||
}
|
||||
|
||||
public void pfcount(final String key) {
|
||||
pfcount(SafeEncoder.encode(key));
|
||||
}
|
||||
|
||||
public void pfcount(final String...keys) {
|
||||
pfcount(SafeEncoder.encodeMany(keys));
|
||||
}
|
||||
|
||||
public void pfmerge(final String destkey, final String... sourcekeys) {
|
||||
pfmerge(SafeEncoder.encode(destkey), SafeEncoder.encodeMany(sourcekeys));
|
||||
}
|
||||
public void clusterSetSlotStable(final int slot) {
|
||||
cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot),
|
||||
Protocol.CLUSTER_SETSLOT_STABLE);
|
||||
}
|
||||
|
||||
public void clusterForget(final String nodeId) {
|
||||
cluster(Protocol.CLUSTER_FORGET, nodeId);
|
||||
}
|
||||
|
||||
public void clusterFlushSlots() {
|
||||
cluster(Protocol.CLUSTER_FLUSHSLOT);
|
||||
}
|
||||
|
||||
public void clusterKeySlot(final String key) {
|
||||
cluster(Protocol.CLUSTER_KEYSLOT, key);
|
||||
}
|
||||
|
||||
public void clusterCountKeysInSlot(final int slot) {
|
||||
cluster(Protocol.CLUSTER_COUNTKEYINSLOT, String.valueOf(slot));
|
||||
}
|
||||
|
||||
public void clusterSaveConfig() {
|
||||
cluster(Protocol.CLUSTER_SAVECONFIG);
|
||||
}
|
||||
|
||||
public void clusterReplicate(final String nodeId) {
|
||||
cluster(Protocol.CLUSTER_REPLICATE, nodeId);
|
||||
}
|
||||
|
||||
public void clusterSlaves(final String nodeId) {
|
||||
cluster(Protocol.CLUSTER_SLAVES, nodeId);
|
||||
}
|
||||
|
||||
public void clusterFailover() {
|
||||
cluster(Protocol.CLUSTER_FAILOVER);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,4 +20,22 @@ public interface ClusterCommands {
|
||||
String clusterSetSlotMigrating(final int slot, final String nodeId);
|
||||
|
||||
String clusterSetSlotImporting(final int slot, final String nodeId);
|
||||
|
||||
String clusterSetSlotStable(final int slot);
|
||||
|
||||
String clusterForget(final String nodeId);
|
||||
|
||||
String clusterFlushSlots();
|
||||
|
||||
Long clusterKeySlot(final String key);
|
||||
|
||||
Long clusterCountKeysInSlot(final int slot);
|
||||
|
||||
String clusterSaveConfig();
|
||||
|
||||
String clusterReplicate(final String nodeId);
|
||||
|
||||
List<String> clusterSlaves(final String nodeId);
|
||||
|
||||
String clusterFailover();
|
||||
}
|
||||
|
||||
@@ -61,6 +61,8 @@ public interface Commands {
|
||||
|
||||
public void incrBy(final String key, final long integer);
|
||||
|
||||
public void incrByFloat(final String key, final double value);
|
||||
|
||||
public void incr(final String key);
|
||||
|
||||
public void append(final String key, final String value);
|
||||
@@ -79,6 +81,8 @@ public interface Commands {
|
||||
|
||||
public void hincrBy(final String key, final String field, final long value);
|
||||
|
||||
public void hincrByFloat(final String key, final String field, final double value);
|
||||
|
||||
public void hexists(final String key, final String field);
|
||||
|
||||
public void hdel(final String key, final String... fields);
|
||||
|
||||
@@ -1,21 +1,14 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import redis.clients.jedis.BinaryClient.LIST_POSITION;
|
||||
import redis.clients.util.Pool;
|
||||
import redis.clients.util.SafeEncoder;
|
||||
import redis.clients.util.Slowlog;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
public class Jedis extends BinaryJedis implements JedisCommands,
|
||||
MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands,
|
||||
BasicCommands, ClusterCommands {
|
||||
@@ -551,6 +544,31 @@ public class Jedis extends BinaryJedis implements JedisCommands,
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
/**
|
||||
* INCRBYFLOAT
|
||||
* <p>
|
||||
* INCRBYFLOAT commands are limited to double precision floating point values.
|
||||
* <p>
|
||||
* Note: this is actually a string operation, that is, in Redis there are
|
||||
* not "double" types. Simply the string stored at the key is parsed as a
|
||||
* base double precision floating point value, incremented, and then
|
||||
* converted back as a string. There is no DECRYBYFLOAT but providing a
|
||||
* negative value will work as expected.
|
||||
* <p>
|
||||
* Time complexity: O(1)
|
||||
*
|
||||
* @param key
|
||||
* @param value
|
||||
* @return Double reply, this commands will reply with the new value of key
|
||||
* after the increment.
|
||||
*/
|
||||
public Double incrByFloat(final String key, final double value) {
|
||||
checkIsInMulti();
|
||||
client.incrByFloat(key, value);
|
||||
String dval = client.getBulkReply();
|
||||
return (dval != null ? new Double(dval) : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the number stored at key by one. If the key does not exist or
|
||||
* contains a value of a wrong type, set the key to the value of "0" before
|
||||
@@ -743,6 +761,32 @@ public class Jedis extends BinaryJedis implements JedisCommands,
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the number stored at field in the hash at key by a double
|
||||
* precision floating point value. If key does not exist,
|
||||
* a new key holding a hash is created. If field does not
|
||||
* exist or holds a string, the value is set to 0 before applying the
|
||||
* operation. Since the value argument is signed you can use this command to
|
||||
* perform both increments and decrements.
|
||||
* <p>
|
||||
* The range of values supported by HINCRBYFLOAT is limited to
|
||||
* double precision floating point values.
|
||||
* <p>
|
||||
* <b>Time complexity:</b> O(1)
|
||||
*
|
||||
* @param key
|
||||
* @param field
|
||||
* @param value
|
||||
* @return Double precision floating point reply The new value at field after the increment
|
||||
* operation.
|
||||
*/
|
||||
public Double hincrByFloat(final String key, final String field, final double value) {
|
||||
checkIsInMulti();
|
||||
client.hincrByFloat(key, field, value);
|
||||
final String dval = client.getBulkReply();
|
||||
return (dval != null ? new Double(dval) : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for existence of a specified field in a hash.
|
||||
*
|
||||
@@ -2663,6 +2707,15 @@ public class Jedis extends BinaryJedis implements JedisCommands,
|
||||
client.getrange(key, startOffset, endOffset);
|
||||
return client.getBulkReply();
|
||||
}
|
||||
|
||||
public Long bitpos(final String key, final boolean value) {
|
||||
return bitpos(key, value, new BitPosParams());
|
||||
}
|
||||
|
||||
public Long bitpos(final String key, final boolean value, final BitPosParams params) {
|
||||
client.bitpos(key, value, params);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the configuration of a running Redis server. Not all the
|
||||
@@ -3064,7 +3117,12 @@ public class Jedis extends BinaryJedis implements JedisCommands,
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public Long pexpire(final String key, final int milliseconds) {
|
||||
return pexpire(key, (long) milliseconds);
|
||||
}
|
||||
|
||||
public Long pexpire(final String key, final long milliseconds) {
|
||||
checkIsInMulti();
|
||||
client.pexpire(key, milliseconds);
|
||||
return client.getIntegerReply();
|
||||
@@ -3082,12 +3140,6 @@ public class Jedis extends BinaryJedis implements JedisCommands,
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
public Double incrByFloat(final String key, final double increment) {
|
||||
checkIsInMulti();
|
||||
client.incrByFloat(key, increment);
|
||||
String relpy = client.getBulkReply();
|
||||
return (relpy != null ? new Double(relpy) : null);
|
||||
}
|
||||
|
||||
public String psetex(final String key, final int milliseconds,
|
||||
final String value) {
|
||||
@@ -3128,14 +3180,6 @@ public class Jedis extends BinaryJedis implements JedisCommands,
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
public Double hincrByFloat(final String key, final String field,
|
||||
double increment) {
|
||||
checkIsInMulti();
|
||||
client.hincrByFloat(key, field, increment);
|
||||
String relpy = client.getBulkReply();
|
||||
return (relpy != null ? new Double(relpy) : null);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
/**
|
||||
* This method is deprecated due to bug (scan cursor should be unsigned long)
|
||||
@@ -3391,6 +3435,60 @@ public class Jedis extends BinaryJedis implements JedisCommands,
|
||||
client.clusterSetSlotImporting(slot, nodeId);
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
public String clusterSetSlotStable(final int slot) {
|
||||
checkIsInMulti();
|
||||
client.clusterSetSlotStable(slot);
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
public String clusterForget(final String nodeId) {
|
||||
checkIsInMulti();
|
||||
client.clusterForget(nodeId);
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
public String clusterFlushSlots() {
|
||||
checkIsInMulti();
|
||||
client.clusterFlushSlots();
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
public Long clusterKeySlot(final String key) {
|
||||
checkIsInMulti();
|
||||
client.clusterKeySlot(key);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
public Long clusterCountKeysInSlot(final int slot) {
|
||||
checkIsInMulti();
|
||||
client.clusterCountKeysInSlot(slot);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
public String clusterSaveConfig() {
|
||||
checkIsInMulti();
|
||||
client.clusterSaveConfig();
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
public String clusterReplicate(final String nodeId) {
|
||||
checkIsInMulti();
|
||||
client.clusterReplicate(nodeId);
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
public List<String> clusterSlaves(final String nodeId) {
|
||||
checkIsInMulti();
|
||||
client.clusterSlaves(nodeId);
|
||||
return client.getMultiBulkReply();
|
||||
}
|
||||
|
||||
public String clusterFailover() {
|
||||
checkIsInMulti();
|
||||
client.clusterFailover();
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
|
||||
public String asking() {
|
||||
checkIsInMulti();
|
||||
@@ -3431,6 +3529,31 @@ public class Jedis extends BinaryJedis implements JedisCommands,
|
||||
}
|
||||
|
||||
public void setDataSource(Pool<Jedis> jedisPool) {
|
||||
this.dataSource = jedisPool;
|
||||
this.dataSource = jedisPool;
|
||||
}
|
||||
|
||||
public Long pfadd(final String key, final String... elements) {
|
||||
checkIsInMulti();
|
||||
client.pfadd(key, elements);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
public long pfcount(final String key) {
|
||||
checkIsInMulti();
|
||||
client.pfcount(key);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long pfcount(String... keys) {
|
||||
checkIsInMulti();
|
||||
client.pfcount(keys);
|
||||
return client.getIntegerReply();
|
||||
}
|
||||
|
||||
public String pfmerge(final String destkey, final String... sourcekeys) {
|
||||
checkIsInMulti();
|
||||
client.pfmerge(destkey, sourcekeys);
|
||||
return client.getStatusCodeReply();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import redis.clients.jedis.BinaryClient.LIST_POSITION;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import redis.clients.jedis.BinaryClient.LIST_POSITION;
|
||||
|
||||
public class JedisCluster implements JedisCommands, BasicCommands {
|
||||
public static final short HASHSLOTS = 16384;
|
||||
private static final int DEFAULT_TIMEOUT = 1;
|
||||
@@ -44,6 +44,18 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
||||
}.run(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String set(final String key, final String value, final String nxxx,
|
||||
final String expx, final long time) {
|
||||
return new JedisClusterCommand<String>(connectionHandler, timeout,
|
||||
maxRedirections) {
|
||||
@Override
|
||||
public String execute(Jedis connection) {
|
||||
return connection.set(key, value, nxxx, expx, time);
|
||||
}
|
||||
}.run(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(final String key) {
|
||||
return new JedisClusterCommand<String>(connectionHandler, timeout,
|
||||
@@ -1129,7 +1141,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
||||
public Long execute(Jedis connection) {
|
||||
return connection.del(key);
|
||||
}
|
||||
}.run(null);
|
||||
}.run(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -1481,4 +1493,26 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
||||
}
|
||||
}.run(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long pfadd(final String key, final String... elements) {
|
||||
return new JedisClusterCommand<Long>(connectionHandler,
|
||||
timeout, maxRedirections) {
|
||||
@Override
|
||||
public Long execute(Jedis connection) {
|
||||
return connection.pfadd(key, elements);
|
||||
}
|
||||
}.run(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long pfcount(final String key) {
|
||||
return new JedisClusterCommand<Long>(connectionHandler,
|
||||
timeout, maxRedirections) {
|
||||
@Override
|
||||
public Long execute(Jedis connection) {
|
||||
return connection.pfcount(key);
|
||||
}
|
||||
}.run(key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,19 +3,17 @@ package redis.clients.jedis;
|
||||
import redis.clients.jedis.exceptions.JedisAskDataException;
|
||||
import redis.clients.jedis.exceptions.JedisClusterException;
|
||||
import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException;
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.jedis.exceptions.JedisMovedDataException;
|
||||
import redis.clients.jedis.exceptions.JedisRedirectionException;
|
||||
import redis.clients.util.JedisClusterCRC16;
|
||||
|
||||
public abstract class JedisClusterCommand<T> {
|
||||
|
||||
private boolean asking = false;
|
||||
|
||||
private JedisClusterConnectionHandler connectionHandler;
|
||||
private int commandTimeout;
|
||||
private int redirections;
|
||||
|
||||
// private boolean asking = false;
|
||||
|
||||
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler,
|
||||
int timeout, int maxRedirections) {
|
||||
this.connectionHandler = connectionHandler;
|
||||
@@ -26,40 +24,80 @@ public abstract class JedisClusterCommand<T> {
|
||||
public abstract T execute(Jedis connection);
|
||||
|
||||
public T run(String key) {
|
||||
if (key == null) {
|
||||
throw new JedisClusterException(
|
||||
"No way to dispatch this command to Redis Cluster.");
|
||||
}
|
||||
|
||||
return runWithRetries(key, this.redirections, false, false);
|
||||
}
|
||||
|
||||
private T runWithRetries(String key, int redirections,
|
||||
boolean tryRandomNode, boolean asking) {
|
||||
if (redirections <= 0) {
|
||||
throw new JedisClusterMaxRedirectionsException(
|
||||
"Too many Cluster redirections?");
|
||||
}
|
||||
|
||||
Jedis connection = null;
|
||||
try {
|
||||
|
||||
if (key == null) {
|
||||
throw new JedisClusterException(
|
||||
"No way to dispatch this command to Redis Cluster.");
|
||||
} else if (redirections == 0) {
|
||||
throw new JedisClusterMaxRedirectionsException(
|
||||
"Too many Cluster redirections?");
|
||||
if (tryRandomNode) {
|
||||
connection = connectionHandler.getConnection();
|
||||
} else {
|
||||
connection = connectionHandler
|
||||
.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
|
||||
}
|
||||
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16
|
||||
.getSlot(key));
|
||||
|
||||
if (asking) {
|
||||
// TODO: Pipeline asking with the original command to make it
|
||||
// faster....
|
||||
connection.asking();
|
||||
|
||||
// if asking success, reset asking flag
|
||||
asking = false;
|
||||
}
|
||||
|
||||
return execute(connection);
|
||||
} catch (JedisConnectionException jce) {
|
||||
if (tryRandomNode) {
|
||||
// maybe all connection is down
|
||||
throw jce;
|
||||
}
|
||||
|
||||
releaseConnection(connection, true);
|
||||
connection = null;
|
||||
|
||||
// retry with random connection
|
||||
return runWithRetries(key, redirections--, true, asking);
|
||||
} catch (JedisRedirectionException jre) {
|
||||
return handleRedirection(jre, key);
|
||||
if (jre instanceof JedisAskDataException) {
|
||||
asking = true;
|
||||
} else if (jre instanceof JedisMovedDataException) {
|
||||
// TODO : In antirez's redis-rb-cluster implementation,
|
||||
// it rebuilds cluster's slot and node cache
|
||||
}
|
||||
|
||||
this.connectionHandler.assignSlotToNode(jre.getSlot(),
|
||||
jre.getTargetNode());
|
||||
|
||||
releaseConnection(connection, false);
|
||||
connection = null;
|
||||
|
||||
return runWithRetries(key, redirections - 1, false, asking);
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
releaseConnection(connection, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void releaseConnection(Jedis connection, boolean broken) {
|
||||
if (connection != null) {
|
||||
if (broken) {
|
||||
connectionHandler.returnBrokenConnection(connection);
|
||||
} else {
|
||||
connectionHandler.returnConnection(connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private T handleRedirection(JedisRedirectionException jre, String key) {
|
||||
if (jre instanceof JedisAskDataException) {
|
||||
asking = true;
|
||||
}
|
||||
redirections--;
|
||||
this.connectionHandler.assignSlotToNode(jre.getSlot(),
|
||||
jre.getTargetNode());
|
||||
return run(key);
|
||||
}
|
||||
}
|
||||
@@ -1,21 +1,26 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.util.ClusterNodeInformation;
|
||||
import redis.clients.util.ClusterNodeInformationParser;
|
||||
|
||||
public abstract class JedisClusterConnectionHandler {
|
||||
public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
|
||||
|
||||
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
|
||||
|
||||
abstract Jedis getConnection();
|
||||
|
||||
|
||||
protected void returnConnection(Jedis connection) {
|
||||
nodes.get(
|
||||
connection.getClient().getHost()
|
||||
+ connection.getClient().getPort()).returnResource(
|
||||
nodes.get(getNodeKey(connection.getClient()))
|
||||
.returnResource(connection);
|
||||
}
|
||||
|
||||
public void returnBrokenConnection(Jedis connection) {
|
||||
nodes.get(getNodeKey(connection.getClient())).returnBrokenResource(
|
||||
connection);
|
||||
}
|
||||
|
||||
@@ -29,71 +34,94 @@ public abstract class JedisClusterConnectionHandler {
|
||||
return nodes;
|
||||
}
|
||||
|
||||
private void initializeSlotsCache(Set<HostAndPort> nodes) {
|
||||
for (HostAndPort hostAndPort : nodes) {
|
||||
private void initializeSlotsCache(Set<HostAndPort> startNodes) {
|
||||
for (HostAndPort hostAndPort : startNodes) {
|
||||
JedisPool jp = new JedisPool(hostAndPort.getHost(),
|
||||
hostAndPort.getPort());
|
||||
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
|
||||
Jedis jedis = jp.getResource();
|
||||
|
||||
this.nodes.clear();
|
||||
this.slots.clear();
|
||||
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
jedis = jp.getResource();
|
||||
discoverClusterNodesAndSlots(jedis);
|
||||
break;
|
||||
} catch (JedisConnectionException e) {
|
||||
if (jedis != null) {
|
||||
jp.returnBrokenResource(jedis);
|
||||
jedis = null;
|
||||
}
|
||||
|
||||
// try next nodes
|
||||
} finally {
|
||||
jp.returnResource(jedis);
|
||||
if (jedis != null) {
|
||||
jp.returnResource(jedis);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (HostAndPort node : startNodes) {
|
||||
setNodeIfNotExist(node);
|
||||
}
|
||||
}
|
||||
|
||||
private void discoverClusterNodesAndSlots(Jedis jedis) {
|
||||
String localNodes = jedis.clusterNodes();
|
||||
for (String nodeInfo : localNodes.split("\n")) {
|
||||
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis);
|
||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||
this.nodes.put(node.getHost() + node.getPort(), nodePool);
|
||||
populateNodeSlots(nodeInfo, nodePool);
|
||||
}
|
||||
}
|
||||
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
|
||||
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
|
||||
jedis.getClient().getPort()));
|
||||
|
||||
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
|
||||
String[] nodeInfoArray = nodeInfo.split(" ");
|
||||
if (nodeInfoArray.length > 7) {
|
||||
for (int i = 8; i < nodeInfoArray.length; i++) {
|
||||
processSlot(nodeInfoArray[i], nodePool);
|
||||
}
|
||||
HostAndPort targetNode = clusterNodeInfo.getNode();
|
||||
setNodeIfNotExist(targetNode);
|
||||
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
|
||||
}
|
||||
}
|
||||
|
||||
private void processSlot(String slot, JedisPool nodePool) {
|
||||
if (slot.contains("-")) {
|
||||
String[] slotRange = slot.split("-");
|
||||
for (int i = Integer.valueOf(slotRange[0]); i <= Integer
|
||||
.valueOf(slotRange[1]); i++) {
|
||||
slots.put(i, nodePool);
|
||||
}
|
||||
} else {
|
||||
slots.put(Integer.valueOf(slot), nodePool);
|
||||
}
|
||||
}
|
||||
|
||||
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo, Jedis currentConnection) {
|
||||
String stringHostAndPort = nodeInfo.split(" ", 3)[1];
|
||||
if (":0".equals(stringHostAndPort)) {
|
||||
return new HostAndPort(currentConnection.getClient().getHost(),
|
||||
currentConnection.getClient().getPort());
|
||||
}
|
||||
String[] arrayHostAndPort = stringHostAndPort.split(":");
|
||||
return new HostAndPort(arrayHostAndPort[0],
|
||||
Integer.valueOf(arrayHostAndPort[1]));
|
||||
}
|
||||
|
||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||
JedisPool targetPool = nodes.get(targetNode.getHost()
|
||||
+ targetNode.getPort());
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
slots.put(slot, targetPool);
|
||||
}
|
||||
|
||||
public void assignSlotsToNode(List<Integer> targetSlots,
|
||||
HostAndPort targetNode) {
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
|
||||
for (Integer slot : targetSlots) {
|
||||
slots.put(slot, targetPool);
|
||||
}
|
||||
}
|
||||
|
||||
protected JedisPool getRandomConnection() {
|
||||
Object[] nodeArray = nodes.values().toArray();
|
||||
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
|
||||
}
|
||||
|
||||
protected String getNodeKey(HostAndPort hnp) {
|
||||
return hnp.getHost() + ":" + hnp.getPort();
|
||||
}
|
||||
|
||||
protected String getNodeKey(Client client) {
|
||||
return client.getHost() + ":" + client.getPort();
|
||||
}
|
||||
|
||||
private void setNodeIfNotExist(HostAndPort node) {
|
||||
String nodeKey = getNodeKey(node);
|
||||
if (nodes.containsKey(nodeKey))
|
||||
return;
|
||||
|
||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||
nodes.put(nodeKey, nodePool);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,9 @@ import java.util.Set;
|
||||
public interface JedisCommands {
|
||||
String set(String key, String value);
|
||||
|
||||
String set(String key, String value, String nxxx,
|
||||
String expx, long time);
|
||||
|
||||
String get(String key);
|
||||
|
||||
Boolean exists(String key);
|
||||
@@ -240,4 +243,9 @@ public interface JedisCommands {
|
||||
ScanResult<String> sscan(final String key, final String cursor);
|
||||
|
||||
ScanResult<Tuple> zscan(final String key, final String cursor);
|
||||
|
||||
Long pfadd(final String key, final String... elements);
|
||||
|
||||
long pfcount(final String key);
|
||||
|
||||
}
|
||||
|
||||
@@ -87,11 +87,15 @@ public class JedisPool extends Pool<Jedis> {
|
||||
}
|
||||
|
||||
public void returnBrokenResource(final Jedis resource) {
|
||||
returnBrokenResourceObject(resource);
|
||||
if (resource != null) {
|
||||
returnBrokenResourceObject(resource);
|
||||
}
|
||||
}
|
||||
|
||||
public void returnResource(final Jedis resource) {
|
||||
resource.resetState();
|
||||
returnResourceObject(resource);
|
||||
if (resource != null) {
|
||||
resource.resetState();
|
||||
returnResourceObject(resource);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,15 +169,17 @@ public class JedisSentinelPool extends Pool<Jedis> {
|
||||
return jedis;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnBrokenResource(final Jedis resource) {
|
||||
returnBrokenResourceObject(resource);
|
||||
if (resource != null) {
|
||||
returnBrokenResourceObject(resource);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnResource(final Jedis resource) {
|
||||
resource.resetState();
|
||||
returnResourceObject(resource);
|
||||
if (resource != null) {
|
||||
resource.resetState();
|
||||
returnResourceObject(resource);
|
||||
}
|
||||
}
|
||||
|
||||
protected class JedisPubSubAdapter extends JedisPubSub {
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
|
||||
public class JedisSlotBasedConnectionHandler extends
|
||||
JedisClusterConnectionHandler {
|
||||
|
||||
@@ -10,7 +15,35 @@ public class JedisSlotBasedConnectionHandler extends
|
||||
}
|
||||
|
||||
public Jedis getConnection() {
|
||||
return getRandomConnection().getResource();
|
||||
// In antirez's redis-rb-cluster implementation,
|
||||
// getRandomConnection always return valid connection (able to ping-pong)
|
||||
// or exception if all connections are invalid
|
||||
|
||||
List<JedisPool> pools = getShuffledNodesPool();
|
||||
|
||||
for (JedisPool pool : pools) {
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
jedis = pool.getResource();
|
||||
|
||||
if (jedis == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String result = jedis.ping();
|
||||
|
||||
if (result.equalsIgnoreCase("pong"))
|
||||
return jedis;
|
||||
|
||||
pool.returnBrokenResource(jedis);
|
||||
} catch (JedisConnectionException ex) {
|
||||
if (jedis != null) {
|
||||
pool.returnBrokenResource(jedis);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new JedisConnectionException("no reachable node in cluster");
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -21,10 +54,19 @@ public class JedisSlotBasedConnectionHandler extends
|
||||
@Override
|
||||
public Jedis getConnectionFromSlot(int slot) {
|
||||
JedisPool connectionPool = slots.get(slot);
|
||||
if (connectionPool == null) {
|
||||
connectionPool = getRandomConnection();
|
||||
if (connectionPool != null) {
|
||||
// It can't guaranteed to get valid connection because of node assignment
|
||||
return connectionPool.getResource();
|
||||
} else {
|
||||
return getConnection();
|
||||
}
|
||||
return connectionPool.getResource();
|
||||
}
|
||||
|
||||
private List<JedisPool> getShuffledNodesPool() {
|
||||
List<JedisPool> pools = new ArrayList<JedisPool>();
|
||||
pools.addAll(nodes.values());
|
||||
Collections.shuffle(pools);
|
||||
return pools;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -69,4 +69,8 @@ public interface MultiKeyBinaryCommands {
|
||||
byte[] randomBinaryKey();
|
||||
|
||||
Long bitop(BitOP op, final byte[] destKey, byte[]... srcKeys);
|
||||
|
||||
String pfmerge(final byte[] destkey, final byte[]... sourcekeys);
|
||||
|
||||
Long pfcount(byte[]... keys);
|
||||
}
|
||||
|
||||
@@ -65,4 +65,8 @@ public interface MultiKeyBinaryRedisPipeline {
|
||||
Response<byte[]> randomKeyBinary();
|
||||
|
||||
Response<Long> bitop(BitOP op, final byte[] destKey, byte[]... srcKeys);
|
||||
|
||||
Response<String> pfmerge(final byte[] destkey, final byte[]... sourcekeys);
|
||||
|
||||
Response<Long> pfcount(final byte[] ... keys);
|
||||
}
|
||||
|
||||
@@ -79,4 +79,8 @@ public interface MultiKeyCommands {
|
||||
ScanResult<String> scan(int cursor);
|
||||
|
||||
ScanResult<String> scan(final String cursor);
|
||||
|
||||
String pfmerge(final String destkey, final String... sourcekeys);
|
||||
|
||||
long pfcount(final String...keys);
|
||||
}
|
||||
|
||||
@@ -64,4 +64,8 @@ public interface MultiKeyCommandsPipeline {
|
||||
Response<String> randomKey();
|
||||
|
||||
Response<Long> bitop(BitOP op, final String destKey, String... srcKeys);
|
||||
|
||||
Response<String> pfmerge(final String destkey, final String... sourcekeys);
|
||||
|
||||
Response<Long> pfcount(final String...keys);
|
||||
}
|
||||
|
||||
@@ -367,6 +367,11 @@ abstract class MultiKeyPipelineBase extends PipelineBase implements
|
||||
client.info();
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
public Response<List<String>> time() {
|
||||
client.time();
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
}
|
||||
|
||||
public Response<Long> dbSize() {
|
||||
client.dbSize();
|
||||
@@ -446,4 +451,28 @@ abstract class MultiKeyPipelineBase extends PipelineBase implements
|
||||
client.clusterSetSlotImporting(slot, nodeId);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response<String> pfmerge(byte[] destkey, byte[]... sourcekeys) {
|
||||
client.pfmerge(destkey, sourcekeys);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response<String> pfmerge(String destkey, String... sourcekeys) {
|
||||
client.pfmerge(destkey, sourcekeys);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response<Long> pfcount(String...keys) {
|
||||
client.pfcount(keys);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response<Long> pfcount(final byte[] ... keys) {
|
||||
client.pfcount(keys);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,12 @@ public class Pipeline extends MultiKeyPipelineBase {
|
||||
return values;
|
||||
}
|
||||
|
||||
public void setResponseDependency(Response<?> dependency) {
|
||||
for (Response<?> response : responses) {
|
||||
response.setDependency(dependency);
|
||||
}
|
||||
}
|
||||
|
||||
public void addResponse(Response<?> response) {
|
||||
responses.add(response);
|
||||
}
|
||||
@@ -98,24 +104,34 @@ public class Pipeline extends MultiKeyPipelineBase {
|
||||
}
|
||||
|
||||
public Response<String> discard() {
|
||||
if (currentMulti == null)
|
||||
throw new JedisDataException("DISCARD without MULTI");
|
||||
|
||||
client.discard();
|
||||
currentMulti = null;
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
public Response<List<Object>> exec() {
|
||||
if (currentMulti == null)
|
||||
throw new JedisDataException("EXEC without MULTI");
|
||||
|
||||
client.exec();
|
||||
Response<List<Object>> response = super.getResponse(currentMulti);
|
||||
currentMulti.setResponseDependency(response);
|
||||
currentMulti = null;
|
||||
return response;
|
||||
}
|
||||
|
||||
public Response<String> multi() {
|
||||
if (currentMulti != null)
|
||||
throw new JedisDataException("MULTI calls can not be nested");
|
||||
|
||||
client.multi();
|
||||
Response<String> response = getResponse(BuilderFactory.STRING); // Expecting
|
||||
// OK
|
||||
currentMulti = new MultiResponseBuilder();
|
||||
return response;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -142,7 +142,25 @@ abstract class PipelineBase extends Queable implements BinaryRedisPipeline,
|
||||
getClient(key).getbit(key, offset);
|
||||
return getResponse(BuilderFactory.BOOLEAN);
|
||||
}
|
||||
|
||||
|
||||
public Response<Long> bitpos(final String key, final boolean value) {
|
||||
return bitpos(key, value, new BitPosParams());
|
||||
}
|
||||
|
||||
public Response<Long> bitpos(final String key, final boolean value, final BitPosParams params) {
|
||||
getClient(key).bitpos(key, value, params);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
|
||||
public Response<Long> bitpos(final byte[] key, final boolean value) {
|
||||
return bitpos(key, value, new BitPosParams());
|
||||
}
|
||||
|
||||
public Response<Long> bitpos(final byte[] key, final boolean value, final BitPosParams params) {
|
||||
getClient(key).bitpos(key, value, params);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
|
||||
public Response<String> getrange(String key, long startOffset,
|
||||
long endOffset) {
|
||||
getClient(key).getrange(key, startOffset, endOffset);
|
||||
@@ -1080,12 +1098,22 @@ abstract class PipelineBase extends Queable implements BinaryRedisPipeline,
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public Response<Long> pexpire(String key, int milliseconds) {
|
||||
return pexpire(key, (long) milliseconds);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public Response<Long> pexpire(byte[] key, int milliseconds) {
|
||||
return pexpire(key, (long) milliseconds);
|
||||
}
|
||||
|
||||
public Response<Long> pexpire(String key, long milliseconds) {
|
||||
getClient(key).pexpire(key, milliseconds);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
|
||||
public Response<Long> pexpire(byte[] key, int milliseconds) {
|
||||
public Response<Long> pexpire(byte[] key, long milliseconds) {
|
||||
getClient(key).pexpire(key, milliseconds);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
@@ -1204,4 +1232,28 @@ abstract class PipelineBase extends Queable implements BinaryRedisPipeline,
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response<Long> pfadd(byte[] key, byte[]... elements) {
|
||||
getClient(key).pfadd(key, elements);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response<Long> pfcount(byte[] key) {
|
||||
getClient(key).pfcount(key);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response<Long> pfadd(String key, String... elements) {
|
||||
getClient(key).pfadd(key, elements);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response<Long> pfcount(String key) {
|
||||
getClient(key).pfcount(key);
|
||||
return getResponse(BuilderFactory.LONG);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import redis.clients.jedis.exceptions.JedisAskDataException;
|
||||
import redis.clients.jedis.exceptions.JedisClusterException;
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.jedis.exceptions.JedisDataException;
|
||||
import redis.clients.jedis.exceptions.JedisMovedDataException;
|
||||
@@ -16,6 +17,7 @@ public final class Protocol {
|
||||
|
||||
private static final String ASK_RESPONSE = "ASK";
|
||||
private static final String MOVED_RESPONSE = "MOVED";
|
||||
private static final String CLUSTERDOWN_RESPONSE = "CLUSTERDOWN";
|
||||
public static final int DEFAULT_PORT = 6379;
|
||||
public static final int DEFAULT_SENTINEL_PORT = 26379;
|
||||
public static final int DEFAULT_TIMEOUT = 2000;
|
||||
@@ -48,6 +50,15 @@ public final class Protocol {
|
||||
public static final String CLUSTER_SETSLOT_NODE = "node";
|
||||
public static final String CLUSTER_SETSLOT_MIGRATING = "migrating";
|
||||
public static final String CLUSTER_SETSLOT_IMPORTING = "importing";
|
||||
public static final String CLUSTER_SETSLOT_STABLE = "stable";
|
||||
public static final String CLUSTER_FORGET = "forget";
|
||||
public static final String CLUSTER_FLUSHSLOT = "flushslots";
|
||||
public static final String CLUSTER_KEYSLOT = "keyslot";
|
||||
public static final String CLUSTER_COUNTKEYINSLOT = "countkeysinslot";
|
||||
public static final String CLUSTER_SAVECONFIG = "saveconfig";
|
||||
public static final String CLUSTER_REPLICATE = "replicate";
|
||||
public static final String CLUSTER_SLAVES = "slaves";
|
||||
public static final String CLUSTER_FAILOVER = "failover";
|
||||
public static final String PUBSUB_CHANNELS= "channels";
|
||||
public static final String PUBSUB_NUMSUB = "numsub";
|
||||
public static final String PUBSUB_NUM_PAT = "numpat";
|
||||
@@ -96,6 +107,8 @@ public final class Protocol {
|
||||
throw new JedisAskDataException(message, new HostAndPort(
|
||||
askInfo[1], Integer.valueOf(askInfo[2])),
|
||||
Integer.valueOf(askInfo[0]));
|
||||
} else if (message.startsWith(CLUSTERDOWN_RESPONSE)) {
|
||||
throw new JedisClusterException(message);
|
||||
}
|
||||
throw new JedisDataException(message);
|
||||
}
|
||||
@@ -204,7 +217,7 @@ public final class Protocol {
|
||||
}
|
||||
|
||||
public static enum Command {
|
||||
PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING;
|
||||
PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, BITPOS, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING, PFADD, PFCOUNT, PFMERGE;
|
||||
|
||||
public final byte[] raw;
|
||||
|
||||
|
||||
@@ -188,4 +188,8 @@ public interface RedisPipeline {
|
||||
Response<Long> bitcount(String key);
|
||||
|
||||
Response<Long> bitcount(String key, long start, long end);
|
||||
|
||||
Response<Long> pfadd(final String key, final String... elements);
|
||||
|
||||
Response<Long> pfcount(final String key);
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ public class Response<T> {
|
||||
private boolean set = false;
|
||||
private Builder<T> builder;
|
||||
private Object data;
|
||||
private Response<?> dependency = null;
|
||||
private boolean requestDependencyBuild = false;
|
||||
|
||||
public Response(Builder<T> b) {
|
||||
this.builder = b;
|
||||
@@ -19,23 +21,39 @@ public class Response<T> {
|
||||
}
|
||||
|
||||
public T get() {
|
||||
// if response has dependency response and dependency is not built,
|
||||
// build it first and no more!!
|
||||
if (!requestDependencyBuild && dependency != null && dependency.set
|
||||
&& !dependency.built) {
|
||||
requestDependencyBuild = true;
|
||||
dependency.build();
|
||||
}
|
||||
if (!set) {
|
||||
throw new JedisDataException(
|
||||
"Please close pipeline or multi block before calling this method.");
|
||||
}
|
||||
if (!built) {
|
||||
if (data != null) {
|
||||
if (data instanceof JedisDataException) {
|
||||
throw new JedisDataException((JedisDataException) data);
|
||||
}
|
||||
response = builder.build(data);
|
||||
}
|
||||
this.data = null;
|
||||
built = true;
|
||||
build();
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
public void setDependency(Response<?> dependency) {
|
||||
this.dependency = dependency;
|
||||
this.requestDependencyBuild = false;
|
||||
}
|
||||
|
||||
private void build() {
|
||||
if (data != null) {
|
||||
if (data instanceof JedisDataException) {
|
||||
throw new JedisDataException((JedisDataException) data);
|
||||
}
|
||||
response = builder.build(data);
|
||||
}
|
||||
data = null;
|
||||
built = true;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "Response " + builder.toString();
|
||||
}
|
||||
|
||||
@@ -13,15 +13,24 @@ import redis.clients.util.SafeEncoder;
|
||||
public class ScanParams {
|
||||
private List<byte[]> params = new ArrayList<byte[]>();
|
||||
public final static String SCAN_POINTER_START = String.valueOf(0);
|
||||
public final static byte[] SCAN_POINTER_START_BINARY = SafeEncoder.encode(SCAN_POINTER_START);
|
||||
|
||||
public void match(final String pattern) {
|
||||
public ScanParams match(final byte[] pattern) {
|
||||
params.add(MATCH.raw);
|
||||
params.add(pattern);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ScanParams match(final String pattern) {
|
||||
params.add(MATCH.raw);
|
||||
params.add(SafeEncoder.encode(pattern));
|
||||
return this;
|
||||
}
|
||||
|
||||
public void count(final int count) {
|
||||
public ScanParams count(final int count) {
|
||||
params.add(COUNT.raw);
|
||||
params.add(Protocol.toByteArray(count));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Collection<byte[]> getParams() {
|
||||
|
||||
@@ -2,8 +2,10 @@ package redis.clients.jedis;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import redis.clients.util.SafeEncoder;
|
||||
|
||||
public class ScanResult<T> {
|
||||
private String cursor;
|
||||
private byte[] cursor;
|
||||
private List<T> results;
|
||||
|
||||
@Deprecated
|
||||
@@ -13,15 +15,18 @@ public class ScanResult<T> {
|
||||
* @see https://github.com/xetorthio/jedis/issues/531
|
||||
*/
|
||||
public ScanResult(int cursor, List<T> results) {
|
||||
this.cursor = String.valueOf(cursor);
|
||||
this.results = results;
|
||||
this(Protocol.toByteArray(cursor), results);
|
||||
}
|
||||
|
||||
public ScanResult(String cursor, List<T> results) {
|
||||
this(SafeEncoder.encode(cursor), results);
|
||||
}
|
||||
|
||||
public ScanResult(byte[] cursor, List<T> results) {
|
||||
this.cursor = cursor;
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
/**
|
||||
* This method is deprecated due to bug (scan cursor should be unsigned long)
|
||||
@@ -30,13 +35,17 @@ public class ScanResult<T> {
|
||||
* @return int(currently), but will be changed to String, so be careful to prepare!
|
||||
*/
|
||||
public int getCursor() {
|
||||
return Integer.parseInt(cursor);
|
||||
return Integer.parseInt(getStringCursor());
|
||||
}
|
||||
|
||||
/**
|
||||
* FIXME: This method should be changed to getCursor() on next major release
|
||||
*/
|
||||
public String getStringCursor() {
|
||||
return SafeEncoder.encode(cursor);
|
||||
}
|
||||
|
||||
public byte[] getCursorAsBytes() {
|
||||
return cursor;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.io.Closeable;
|
||||
import redis.clients.jedis.BinaryClient.LIST_POSITION;
|
||||
import redis.clients.util.Hashing;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import redis.clients.jedis.BinaryClient.LIST_POSITION;
|
||||
import redis.clients.util.Hashing;
|
||||
import redis.clients.util.Pool;
|
||||
|
||||
public class ShardedJedis extends BinaryShardedJedis implements JedisCommands,
|
||||
@@ -38,6 +39,13 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands,
|
||||
return j.set(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String set(String key, String value, String nxxx, String expx,
|
||||
long time) {
|
||||
Jedis j = getShard(key);
|
||||
return j.set(key, value, nxxx, expx, time);
|
||||
}
|
||||
|
||||
public String get(String key) {
|
||||
Jedis j = getShard(key);
|
||||
return j.get(key);
|
||||
@@ -138,6 +146,11 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands,
|
||||
return j.incrBy(key, integer);
|
||||
}
|
||||
|
||||
public Double incrByFloat(String key, double integer) {
|
||||
Jedis j = getShard(key);
|
||||
return j.incrByFloat(key, integer);
|
||||
}
|
||||
|
||||
public Long incr(String key) {
|
||||
Jedis j = getShard(key);
|
||||
return j.incr(key);
|
||||
@@ -183,6 +196,11 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands,
|
||||
return j.hincrBy(key, field, value);
|
||||
}
|
||||
|
||||
public Double hincrByFloat(String key, String field, double value) {
|
||||
Jedis j = getShard(key);
|
||||
return j.hincrByFloat(key, field, value);
|
||||
}
|
||||
|
||||
public Boolean hexists(String key, String field) {
|
||||
Jedis j = getShard(key);
|
||||
return j.hexists(key, field);
|
||||
@@ -610,4 +628,15 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands,
|
||||
jedis.resetState();
|
||||
}
|
||||
}
|
||||
|
||||
public Long pfadd(String key, String... elements) {
|
||||
Jedis j = getShard(key);
|
||||
return j.pfadd(key, elements);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long pfcount(String key) {
|
||||
Jedis j = getShard(key);
|
||||
return j.pfcount(key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,26 +31,29 @@ public class ShardedJedisPool extends Pool<ShardedJedis> {
|
||||
List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
|
||||
super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ShardedJedis getResource() {
|
||||
ShardedJedis jedis = super.getResource();
|
||||
jedis.setDataSource(this);
|
||||
return jedis;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void returnBrokenResource(final ShardedJedis resource) {
|
||||
returnBrokenResourceObject(resource);
|
||||
if (resource != null) {
|
||||
returnBrokenResourceObject(resource);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnResource(final ShardedJedis resource) {
|
||||
resource.resetState();
|
||||
returnResourceObject(resource);
|
||||
if (resource != null) {
|
||||
resource.resetState();
|
||||
returnResourceObject(resource);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* PoolableObjectFactory custom impl.
|
||||
*/
|
||||
|
||||
48
src/main/java/redis/clients/util/ClusterNodeInformation.java
Normal file
48
src/main/java/redis/clients/util/ClusterNodeInformation.java
Normal file
@@ -0,0 +1,48 @@
|
||||
package redis.clients.util;
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class ClusterNodeInformation {
|
||||
private HostAndPort node;
|
||||
private List<Integer> availableSlots;
|
||||
private List<Integer> slotsBeingImported;
|
||||
private List<Integer> slotsBeingMigrated;
|
||||
|
||||
public ClusterNodeInformation(HostAndPort node) {
|
||||
this.node = node;
|
||||
this.availableSlots = new ArrayList<Integer>();
|
||||
this.slotsBeingImported = new ArrayList<Integer>();
|
||||
this.slotsBeingMigrated = new ArrayList<Integer>();
|
||||
}
|
||||
|
||||
public void addAvailableSlot(int slot) {
|
||||
availableSlots.add(slot);
|
||||
}
|
||||
|
||||
public void addSlotBeingImported(int slot) {
|
||||
slotsBeingImported.add(slot);
|
||||
}
|
||||
|
||||
public void addSlotBeingMigrated(int slot) {
|
||||
slotsBeingMigrated.add(slot);
|
||||
}
|
||||
|
||||
public HostAndPort getNode() {
|
||||
return node;
|
||||
}
|
||||
|
||||
public List<Integer> getAvailableSlots() {
|
||||
return availableSlots;
|
||||
}
|
||||
|
||||
public List<Integer> getSlotsBeingImported() {
|
||||
return slotsBeingImported;
|
||||
}
|
||||
|
||||
public List<Integer> getSlotsBeingMigrated() {
|
||||
return slotsBeingMigrated;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package redis.clients.util;
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
|
||||
public class ClusterNodeInformationParser {
|
||||
private static final String HOST_MYSELF_IDENTIFIER = ":0";
|
||||
private static final String SLOT_IMPORT_IDENTIFIER = "-<-";
|
||||
private static final String SLOT_IN_TRANSITION_IDENTIFIER = "[";
|
||||
public static final int SLOT_INFORMATIONS_START_INDEX = 8;
|
||||
public static final int HOST_AND_PORT_INDEX = 1;
|
||||
|
||||
public ClusterNodeInformation parse(String nodeInfo, HostAndPort current) {
|
||||
String[] nodeInfoPartArray = nodeInfo.split(" ");
|
||||
|
||||
HostAndPort node = getHostAndPortFromNodeLine(nodeInfoPartArray,
|
||||
current);
|
||||
ClusterNodeInformation info = new ClusterNodeInformation(node);
|
||||
|
||||
if (nodeInfoPartArray.length >= SLOT_INFORMATIONS_START_INDEX) {
|
||||
String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
|
||||
fillSlotInformation(slotInfoPartArray, info);
|
||||
}
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
private String[] extractSlotParts(String[] nodeInfoPartArray) {
|
||||
String[] slotInfoPartArray = new String[nodeInfoPartArray.length
|
||||
- SLOT_INFORMATIONS_START_INDEX];
|
||||
for (int i = SLOT_INFORMATIONS_START_INDEX; i < nodeInfoPartArray.length; i++) {
|
||||
slotInfoPartArray[i - SLOT_INFORMATIONS_START_INDEX] = nodeInfoPartArray[i];
|
||||
}
|
||||
return slotInfoPartArray;
|
||||
}
|
||||
|
||||
public HostAndPort getHostAndPortFromNodeLine(String[] nodeInfoPartArray,
|
||||
HostAndPort current) {
|
||||
String stringHostAndPort = nodeInfoPartArray[HOST_AND_PORT_INDEX];
|
||||
if (HOST_MYSELF_IDENTIFIER.equals(stringHostAndPort)) {
|
||||
return current;
|
||||
}
|
||||
|
||||
String[] arrayHostAndPort = stringHostAndPort.split(":");
|
||||
return new HostAndPort(arrayHostAndPort[0],
|
||||
Integer.valueOf(arrayHostAndPort[1]));
|
||||
}
|
||||
|
||||
private void fillSlotInformation(String[] slotInfoPartArray,
|
||||
ClusterNodeInformation info) {
|
||||
for (String slotRange : slotInfoPartArray) {
|
||||
fillSlotInformationFromSlotRange(slotRange, info);
|
||||
}
|
||||
}
|
||||
|
||||
private void fillSlotInformationFromSlotRange(String slotRange,
|
||||
ClusterNodeInformation info) {
|
||||
if (slotRange.startsWith(SLOT_IN_TRANSITION_IDENTIFIER)) {
|
||||
// slot is in transition
|
||||
int slot = Integer.parseInt(slotRange.substring(1).split("-")[0]);
|
||||
|
||||
if (slotRange.contains(SLOT_IMPORT_IDENTIFIER)) {
|
||||
// import
|
||||
info.addSlotBeingImported(slot);
|
||||
} else {
|
||||
// migrate (->-)
|
||||
info.addSlotBeingMigrated(slot);
|
||||
}
|
||||
} else if (slotRange.contains("-")) {
|
||||
// slot range
|
||||
String[] slotRangePart = slotRange.split("-");
|
||||
for (int slot = Integer.valueOf(slotRangePart[0]); slot <= Integer
|
||||
.valueOf(slotRangePart[1]); slot++) {
|
||||
info.addAvailableSlot(slot);
|
||||
}
|
||||
} else {
|
||||
// single slot
|
||||
info.addAvailableSlot(Integer.valueOf(slotRange));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -45,6 +45,9 @@ public abstract class Pool<T> {
|
||||
}
|
||||
|
||||
public void returnResourceObject(final T resource) {
|
||||
if (resource == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
internalPool.returnObject(resource);
|
||||
} catch (Exception e) {
|
||||
@@ -54,11 +57,15 @@ public abstract class Pool<T> {
|
||||
}
|
||||
|
||||
public void returnBrokenResource(final T resource) {
|
||||
returnBrokenResourceObject(resource);
|
||||
if (resource != null) {
|
||||
returnBrokenResourceObject(resource);
|
||||
}
|
||||
}
|
||||
|
||||
public void returnResource(final T resource) {
|
||||
returnResourceObject(resource);
|
||||
if (resource != null) {
|
||||
returnResourceObject(resource);
|
||||
}
|
||||
}
|
||||
|
||||
public void destroy() {
|
||||
@@ -81,4 +88,4 @@ public abstract class Pool<T> {
|
||||
throw new JedisException("Could not destroy the pool", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user