Merge branch 'master' into support-sorted-set-with-lex

Conflicts:
	src/main/java/redis/clients/jedis/BinaryShardedJedis.java
	src/main/java/redis/clients/jedis/Jedis.java
	src/main/java/redis/clients/jedis/Protocol.java
	src/test/java/redis/clients/jedis/tests/commands/SortedSetCommandsTest.java
This commit is contained in:
Jungtaek Lim
2014-09-10 21:43:32 +09:00
64 changed files with 2572 additions and 703 deletions

View File

@@ -1,5 +1,7 @@
package redis.clients.jedis;
import java.util.List;
/**
* Pipelined responses for all of the low level, non key related commands
*/
@@ -24,6 +26,8 @@ public interface BasicRedisPipeline {
Response<String> flushAll();
Response<String> info();
Response<List<String>> time();
Response<Long> dbSize();

View File

@@ -1,17 +1,8 @@
package redis.clients.jedis;
import static redis.clients.jedis.Protocol.toByteArray;
import static redis.clients.jedis.Protocol.Command.*;
import static redis.clients.jedis.Protocol.Keyword.ENCODING;
import static redis.clients.jedis.Protocol.Keyword.IDLETIME;
import static redis.clients.jedis.Protocol.Keyword.LEN;
import static redis.clients.jedis.Protocol.Keyword.LIMIT;
import static redis.clients.jedis.Protocol.Keyword.NO;
import static redis.clients.jedis.Protocol.Keyword.ONE;
import static redis.clients.jedis.Protocol.Keyword.REFCOUNT;
import static redis.clients.jedis.Protocol.Keyword.RESET;
import static redis.clients.jedis.Protocol.Keyword.STORE;
import static redis.clients.jedis.Protocol.Keyword.WITHSCORES;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.util.SafeEncoder;
import java.util.ArrayList;
import java.util.List;
@@ -19,9 +10,14 @@ import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.util.SafeEncoder;
import static redis.clients.jedis.Protocol.Command.*;
import static redis.clients.jedis.Protocol.Command.EXISTS;
import static redis.clients.jedis.Protocol.Command.PSUBSCRIBE;
import static redis.clients.jedis.Protocol.Command.PUNSUBSCRIBE;
import static redis.clients.jedis.Protocol.Command.SUBSCRIBE;
import static redis.clients.jedis.Protocol.Command.UNSUBSCRIBE;
import static redis.clients.jedis.Protocol.Keyword.*;
import static redis.clients.jedis.Protocol.toByteArray;
public class BinaryClient extends Connection {
public enum LIST_POSITION {
@@ -204,6 +200,10 @@ public class BinaryClient extends Connection {
sendCommand(INCRBY, key, toByteArray(integer));
}
public void incrByFloat(final byte[] key, final double value) {
sendCommand(INCRBYFLOAT, key, toByteArray(value));
}
public void incr(final byte[] key) {
sendCommand(INCR, key);
}
@@ -321,7 +321,7 @@ public class BinaryClient extends Connection {
public void sadd(final byte[] key, final byte[]... members) {
sendCommand(SADD, joinParameters(key, members));
}
public void smembers(final byte[] key) {
sendCommand(SMEMBERS, key);
}
@@ -949,7 +949,15 @@ public class BinaryClient extends Connection {
public void getbit(byte[] key, long offset) {
sendCommand(GETBIT, key, toByteArray(offset));
}
public void bitpos(final byte[] key, final boolean value, final BitPosParams params) {
final List<byte[]> args = new ArrayList<byte[]>();
args.add(key);
args.add(toByteArray(value));
args.addAll(params.getParams());
sendCommand(BITPOS, args.toArray(new byte[args.size()][]));
}
public void setrange(byte[] key, long offset, byte[] value) {
sendCommand(SETRANGE, key, toByteArray(offset), value);
}
@@ -975,9 +983,6 @@ public class BinaryClient extends Connection {
}
public void resetState() {
if (isInMulti())
discard();
if (isInWatch())
unwatch();
}
@@ -1111,7 +1116,7 @@ public class BinaryClient extends Connection {
sendCommand(RESTORE, key, toByteArray(ttl), serializedValue);
}
public void pexpire(final byte[] key, final int milliseconds) {
public void pexpire(final byte[] key, final long milliseconds) {
sendCommand(PEXPIRE, key, toByteArray(milliseconds));
}
@@ -1123,10 +1128,6 @@ public class BinaryClient extends Connection {
sendCommand(PTTL, key);
}
public void incrByFloat(final byte[] key, final double increment) {
sendCommand(INCRBYFLOAT, key, toByteArray(increment));
}
public void psetex(final byte[] key, final int milliseconds,
final byte[] value) {
sendCommand(PSETEX, key, toByteArray(milliseconds), value);
@@ -1176,61 +1177,6 @@ public class BinaryClient extends Connection {
sendCommand(HINCRBYFLOAT, key, field, toByteArray(increment));
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void scan(int cursor, final ScanParams params) {
final List<byte[]> args = new ArrayList<byte[]>();
args.add(toByteArray(cursor));
args.addAll(params.getParams());
sendCommand(SCAN, args.toArray(new byte[args.size()][]));
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void hscan(final byte[] key, int cursor, final ScanParams params) {
final List<byte[]> args = new ArrayList<byte[]>();
args.add(key);
args.add(toByteArray(cursor));
args.addAll(params.getParams());
sendCommand(HSCAN, args.toArray(new byte[args.size()][]));
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void sscan(final byte[] key, int cursor, final ScanParams params) {
final List<byte[]> args = new ArrayList<byte[]>();
args.add(key);
args.add(toByteArray(cursor));
args.addAll(params.getParams());
sendCommand(SSCAN, args.toArray(new byte[args.size()][]));
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void zscan(final byte[] key, int cursor, final ScanParams params) {
final List<byte[]> args = new ArrayList<byte[]>();
args.add(key);
args.add(toByteArray(cursor));
args.addAll(params.getParams());
sendCommand(ZSCAN, args.toArray(new byte[args.size()][]));
}
public void scan(final byte[] cursor, final ScanParams params) {
final List<byte[]> args = new ArrayList<byte[]>();
args.add(cursor);
@@ -1273,4 +1219,20 @@ public class BinaryClient extends Connection {
public void asking() {
sendCommand(Command.ASKING);
}
public void pfadd(final byte[] key, final byte[]... elements) {
sendCommand(PFADD, joinParameters(key, elements));
}
public void pfcount(final byte[] key) {
sendCommand(PFCOUNT, key);
}
public void pfcount(final byte[]...keys) {
sendCommand(PFCOUNT, keys);
}
public void pfmerge(final byte[] destkey, final byte[]... sourcekeys) {
sendCommand(PFMERGE, joinParameters(destkey, sourcekeys));
}
}

View File

@@ -4,6 +4,7 @@ import static redis.clients.jedis.Protocol.toByteArray;
import java.io.Closeable;
import java.net.URI;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -22,7 +23,9 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
MultiKeyBinaryCommands, AdvancedBinaryJedisCommands,
BinaryScriptingCommands, Closeable {
protected Client client = null;
protected Transaction transaction = null;
protected Pipeline pipeline = null;
public BinaryJedis(final String host) {
URI uri = URI.create(host);
if (uri.getScheme() != null && uri.getScheme().equals("redis")) {
@@ -633,6 +636,37 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
return client.getIntegerReply();
}
/**
* INCRBYFLOAT work just like {@link #incrBy(byte[]) INCRBY} but increments
* by floats instead of integers.
* <p>
* INCRBYFLOAT commands are limited to double precision floating point
* values.
* <p>
* Note: this is actually a string operation, that is, in Redis there are
* not "double" types. Simply the string stored at the key is parsed as a
* base double precision floating point value, incremented, and then
* converted back as a string. There is no DECRYBYFLOAT but providing a
* negative value will work as expected.
* <p>
* Time complexity: O(1)
*
* @see #incr(byte[])
* @see #decr(byte[])
* @see #decrBy(byte[], long)
*
* @param key
* @param integer
* @return Integer reply, this commands will reply with the new value of key
* after the increment.
*/
public Double incrByFloat(final byte[] key, final double integer) {
checkIsInMulti();
client.incrByFloat(key, integer);
String dval = client.getBulkReply();
return (dval != null ? new Double(dval) : null);
}
/**
* Increment the number stored at key by one. If the key does not exist or
* contains a value of a wrong type, set the key to the value of "0" before
@@ -825,6 +859,33 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
return client.getIntegerReply();
}
/**
* Increment the number stored at field in the hash at key by a double
* precision floating point value. If key does not exist, a new key holding
* a hash is created. If field does not exist or holds a string, the value
* is set to 0 before applying the operation. Since the value argument is
* signed you can use this command to perform both increments and
* decrements.
* <p>
* The range of values supported by HINCRBYFLOAT is limited to double
* precision floating point values.
* <p>
* <b>Time complexity:</b> O(1)
*
* @param key
* @param field
* @param value
* @return Double precision floating point reply The new value at field
* after the increment operation.
*/
public Double hincrByFloat(final byte[] key, final byte[] field,
final double value) {
checkIsInMulti();
client.hincrByFloat(key, field, value);
final String dval = client.getBulkReply();
return (dval != null ? new Double(dval) : null);
}
/**
* Test for existence of a specified field in a hash.
*
@@ -1694,7 +1755,9 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
public Transaction multi() {
client.multi();
return new Transaction(client);
client.getOne(); // expected OK
transaction = new Transaction(client);
return transaction;
}
@Deprecated
@@ -1707,9 +1770,10 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
public List<Object> multi(final TransactionBlock jedisTransaction) {
List<Object> results = null;
jedisTransaction.setClient(client);
client.multi();
jedisTransaction.execute();
results = jedisTransaction.exec();
client.multi();
client.getOne(); // expected OK
jedisTransaction.execute();
results = jedisTransaction.exec();
return results;
}
@@ -1729,8 +1793,24 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
}
public void resetState() {
client.resetState();
client.getAll();
if (client.isConnected()) {
if (transaction != null) {
transaction.clear();
}
if (pipeline != null) {
pipeline.clear();
}
if (client.isInWatch()) {
unwatch();
}
client.resetState();
}
transaction = null;
pipeline = null;
}
public String watch(final byte[]... keys) {
@@ -1744,7 +1824,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
}
@Override
public void close() {
public void close() {
client.close();
}
@@ -2146,7 +2226,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
}
public Pipeline pipelined() {
Pipeline pipeline = new Pipeline();
pipeline = new Pipeline();
pipeline.setClient(client);
return pipeline;
}
@@ -3155,6 +3235,16 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
return client.getIntegerReply() == 1;
}
public Long bitpos(final byte[] key, final boolean value) {
return bitpos(key, value, new BitPosParams());
}
public Long bitpos(final byte[] key, final boolean value,
final BitPosParams params) {
client.bitpos(key, value, params);
return client.getIntegerReply();
}
public Long setrange(byte[] key, long offset, byte[] value) {
client.setrange(key, offset, value);
return client.getIntegerReply();
@@ -3343,7 +3433,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
return client.getStatusCodeReply();
}
public Long pexpire(final byte[] key, final int milliseconds) {
public Long pexpire(final byte[] key, final long milliseconds) {
checkIsInMulti();
client.pexpire(key, milliseconds);
return client.getIntegerReply();
@@ -3361,13 +3451,6 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
return client.getIntegerReply();
}
public Double incrByFloat(final byte[] key, final double increment) {
checkIsInMulti();
client.incrByFloat(key, increment);
String relpy = client.getBulkReply();
return (relpy != null ? new Double(relpy) : null);
}
public String psetex(final byte[] key, final int milliseconds,
final byte[] value) {
checkIsInMulti();
@@ -3425,14 +3508,6 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
return client.getStatusCodeReply();
}
public Double hincrByFloat(final byte[] key, final byte[] field,
double increment) {
checkIsInMulti();
client.hincrByFloat(key, field, increment);
String relpy = client.getBulkReply();
return (relpy != null ? new Double(relpy) : null);
}
/**
* Syncrhonous replication of Redis as described here:
* http://antirez.com/news/66
@@ -3446,4 +3521,99 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
return client.getIntegerReply();
}
@Override
public Long pfadd(final byte[] key, final byte[]... elements) {
checkIsInMulti();
client.pfadd(key, elements);
return client.getIntegerReply();
}
@Override
public long pfcount(final byte[] key) {
checkIsInMulti();
client.pfcount(key);
return client.getIntegerReply();
}
@Override
public String pfmerge(final byte[] destkey, final byte[]... sourcekeys) {
checkIsInMulti();
client.pfmerge(destkey, sourcekeys);
return client.getStatusCodeReply();
}
@Override
public Long pfcount(byte[]... keys) {
checkIsInMulti();
client.pfcount(keys);
return client.getIntegerReply();
}
public ScanResult<byte[]> scan(final byte[] cursor) {
return scan(cursor, new ScanParams());
}
public ScanResult<byte[]> scan(final byte[] cursor, final ScanParams params) {
checkIsInMulti();
client.scan(cursor, params);
List<Object> result = client.getObjectMultiBulkReply();
byte[] newcursor = (byte[]) result.get(0);
List<byte[]> rawResults = (List<byte[]>) result.get(1);
return new ScanResult<byte[]>(newcursor, rawResults);
}
public ScanResult<Map.Entry<byte[], byte[]>> hscan(final byte[] key,
final byte[] cursor) {
return hscan(key, cursor, new ScanParams());
}
public ScanResult<Map.Entry<byte[], byte[]>> hscan(final byte[] key,
final byte[] cursor, final ScanParams params) {
checkIsInMulti();
client.hscan(key, cursor, params);
List<Object> result = client.getObjectMultiBulkReply();
byte[] newcursor = (byte[]) result.get(0);
List<Map.Entry<byte[], byte[]>> results = new ArrayList<Map.Entry<byte[], byte[]>>();
List<byte[]> rawResults = (List<byte[]>) result.get(1);
Iterator<byte[]> iterator = rawResults.iterator();
while (iterator.hasNext()) {
results.add(new AbstractMap.SimpleEntry<byte[], byte[]>(iterator
.next(), iterator.next()));
}
return new ScanResult<Map.Entry<byte[], byte[]>>(newcursor, results);
}
public ScanResult<byte[]> sscan(final byte[] key, final byte[] cursor) {
return sscan(key, cursor, new ScanParams());
}
public ScanResult<byte[]> sscan(final byte[] key, final byte[] cursor,
final ScanParams params) {
checkIsInMulti();
client.sscan(key, cursor, params);
List<Object> result = client.getObjectMultiBulkReply();
byte[] newcursor = (byte[]) result.get(0);
List<byte[]> rawResults = (List<byte[]>) result.get(1);
return new ScanResult<byte[]>(newcursor, rawResults);
}
public ScanResult<Tuple> zscan(final byte[] key, final byte[] cursor) {
return zscan(key, cursor, new ScanParams());
}
public ScanResult<Tuple> zscan(final byte[] key, final byte[] cursor,
final ScanParams params) {
checkIsInMulti();
client.zscan(key, cursor, params);
List<Object> result = client.getObjectMultiBulkReply();
byte[] newcursor = (byte[]) result.get(0);
List<Tuple> results = new ArrayList<Tuple>();
List<byte[]> rawResults = (List<byte[]>) result.get(1);
Iterator<byte[]> iterator = rawResults.iterator();
while (iterator.hasNext()) {
results.add(new Tuple(iterator.next(), Double.valueOf(SafeEncoder
.encode(iterator.next()))));
}
return new ScanResult<Tuple>(newcursor, results);
}
}

View File

@@ -47,6 +47,8 @@ public interface BinaryJedisCommands {
Long incrBy(byte[] key, long integer);
Double incrByFloat(byte[] key, double value);
Long incr(byte[] key);
Long append(byte[] key, byte[] value);
@@ -65,6 +67,8 @@ public interface BinaryJedisCommands {
Long hincrBy(byte[] key, byte[] field, long value);
Double hincrByFloat(byte[] key, byte[] field, double value);
Boolean hexists(byte[] key, byte[] field);
Long hdel(byte[] key, byte[]... field);
@@ -111,6 +115,8 @@ public interface BinaryJedisCommands {
byte[] srandmember(byte[] key);
List<byte[]> srandmember(final byte[] key, final int count);
Long strlen(byte[] key);
Long zadd(byte[] key, double score, byte[] member);
@@ -220,4 +226,8 @@ public interface BinaryJedisCommands {
Long bitcount(final byte[] key);
Long bitcount(final byte[] key, long start, long end);
Long pfadd(final byte[] key, final byte[]... elements);
long pfcount(final byte[] key);
}

View File

@@ -34,7 +34,7 @@ public interface BinaryRedisPipeline {
Response<byte[]> getSet(byte[] key, byte[] value);
Response<Long> getrange(byte[] key, long startOffset, long endOffset);
Response<byte[]> getrange(byte[] key, long startOffset, long endOffset);
Response<Long> hdel(byte[] key, byte[]... field);
@@ -219,4 +219,8 @@ public interface BinaryRedisPipeline {
Response<Long> bitcount(byte[] key);
Response<Long> bitcount(byte[] key, long start, long end);
Response<Long> pfadd(final byte[] key, final byte[]... elements);
Response<Long> pfcount(final byte[] key);
}

View File

@@ -110,6 +110,11 @@ public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo>
return j.incrBy(key, integer);
}
public Double incrByFloat(byte[] key, double integer) {
Jedis j = getShard(key);
return j.incrByFloat(key, integer);
}
public Long incr(byte[] key) {
Jedis j = getShard(key);
return j.incr(key);
@@ -155,6 +160,11 @@ public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo>
return j.hincrBy(key, field, value);
}
public Double hincrByFloat(byte[] key, byte[] field, double value) {
Jedis j = getShard(key);
return j.hincrByFloat(key, field, value);
}
public Boolean hexists(byte[] key, byte[] field) {
Jedis j = getShard(key);
return j.hexists(key, field);
@@ -290,6 +300,12 @@ public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo>
return j.srandmember(key);
}
@Override
public List srandmember(byte[] key, int count) {
Jedis j = getShard(key);
return j.srandmember(key, count);
}
public Long zadd(byte[] key, double score, byte[] member) {
Jedis j = getShard(key);
return j.zadd(key, score, member);
@@ -594,5 +610,17 @@ public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo>
Jedis j = getShard(key);
return j.bitcount(key, start, end);
}
@Override
public Long pfadd(final byte[] key, final byte[]... elements) {
Jedis j = getShard(key);
return j.pfadd(key, elements);
}
@Override
public long pfcount(final byte[] key) {
Jedis j = getShard(key);
return j.pfcount(key);
}
}

View File

@@ -0,0 +1,27 @@
package redis.clients.jedis;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
public class BitPosParams {
private List<byte[]> params = new ArrayList<byte[]>();
protected BitPosParams() {
}
public BitPosParams(long start) {
params.add(Protocol.toByteArray(start));
}
public BitPosParams(long start, long end) {
this(start);
params.add(Protocol.toByteArray(end));
}
public Collection<byte[]> getParams() {
return Collections.unmodifiableCollection(params);
}
}

View File

@@ -9,6 +9,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import redis.clients.util.JedisByteHashMap;
import redis.clients.util.SafeEncoder;
public class BuilderFactory {
@@ -104,6 +105,26 @@ public class BuilderFactory {
};
public static final Builder<Map<String, String>> PUBSUB_NUMSUB_MAP = new Builder<Map<String, String>>() {
@SuppressWarnings("unchecked")
public Map<String, String> build(Object data) {
final List<Object> flatHash = (List<Object>) data;
final Map<String, String> hash = new HashMap<String, String>();
final Iterator<Object> iterator = flatHash.iterator();
while (iterator.hasNext()) {
hash.put(SafeEncoder.encode((byte[]) iterator.next()),
String.valueOf((Long) iterator.next()));
}
return hash;
}
public String toString() {
return "PUBSUB_NUMSUB_MAP<String, String>";
}
};
public static final Builder<Set<String>> STRING_SET = new Builder<Set<String>>() {
@SuppressWarnings("unchecked")
public Set<String> build(Object data) {
@@ -170,7 +191,7 @@ public class BuilderFactory {
@SuppressWarnings("unchecked")
public Map<byte[], byte[]> build(Object data) {
final List<byte[]> flatHash = (List<byte[]>) data;
final Map<byte[], byte[]> hash = new HashMap<byte[], byte[]>();
final Map<byte[], byte[]> hash = new JedisByteHashMap();
final Iterator<byte[]> iterator = flatHash.iterator();
while (iterator.hasNext()) {
hash.put(iterator.next(), iterator.next());

View File

@@ -647,6 +647,9 @@ public class Client extends BinaryClient implements Commands {
getbit(SafeEncoder.encode(key), offset);
}
public void bitpos(final String key, final boolean value, final BitPosParams params) {
bitpos(SafeEncoder.encode(key), value, params);
}
public void setrange(String key, long offset, String value) {
setrange(SafeEncoder.encode(key), offset, SafeEncoder.encode(value));
}
@@ -795,7 +798,7 @@ public class Client extends BinaryClient implements Commands {
restore(SafeEncoder.encode(key), ttl, serializedValue);
}
public void pexpire(final String key, final int milliseconds) {
public void pexpire(final String key, final long milliseconds) {
pexpire(SafeEncoder.encode(key), milliseconds);
}
@@ -851,36 +854,6 @@ public class Client extends BinaryClient implements Commands {
increment);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void hscan(final String key, int cursor, final ScanParams params) {
hscan(SafeEncoder.encode(key), cursor, params);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void sscan(final String key, int cursor, final ScanParams params) {
sscan(SafeEncoder.encode(key), cursor, params);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void zscan(final String key, int cursor, final ScanParams params) {
zscan(SafeEncoder.encode(key), cursor, params);
}
public void scan(final String cursor, final ScanParams params) {
scan(SafeEncoder.encode(cursor), params);
}
@@ -969,4 +942,60 @@ public class Client extends BinaryClient implements Commands {
cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot),
Protocol.CLUSTER_SETSLOT_IMPORTING, nodeId);
}
public void pfadd(String key, final String... elements) {
pfadd(SafeEncoder.encode(key), SafeEncoder.encodeMany(elements));
}
public void pfcount(final String key) {
pfcount(SafeEncoder.encode(key));
}
public void pfcount(final String...keys) {
pfcount(SafeEncoder.encodeMany(keys));
}
public void pfmerge(final String destkey, final String... sourcekeys) {
pfmerge(SafeEncoder.encode(destkey), SafeEncoder.encodeMany(sourcekeys));
}
public void clusterSetSlotStable(final int slot) {
cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot),
Protocol.CLUSTER_SETSLOT_STABLE);
}
public void clusterForget(final String nodeId) {
cluster(Protocol.CLUSTER_FORGET, nodeId);
}
public void clusterFlushSlots() {
cluster(Protocol.CLUSTER_FLUSHSLOT);
}
public void clusterKeySlot(final String key) {
cluster(Protocol.CLUSTER_KEYSLOT, key);
}
public void clusterCountKeysInSlot(final int slot) {
cluster(Protocol.CLUSTER_COUNTKEYINSLOT, String.valueOf(slot));
}
public void clusterSaveConfig() {
cluster(Protocol.CLUSTER_SAVECONFIG);
}
public void clusterReplicate(final String nodeId) {
cluster(Protocol.CLUSTER_REPLICATE, nodeId);
}
public void clusterSlaves(final String nodeId) {
cluster(Protocol.CLUSTER_SLAVES, nodeId);
}
public void clusterFailover() {
cluster(Protocol.CLUSTER_FAILOVER);
}
public void clusterSlots() {
cluster(Protocol.CLUSTER_SLOTS);
}
}

View File

@@ -20,4 +20,24 @@ public interface ClusterCommands {
String clusterSetSlotMigrating(final int slot, final String nodeId);
String clusterSetSlotImporting(final int slot, final String nodeId);
String clusterSetSlotStable(final int slot);
String clusterForget(final String nodeId);
String clusterFlushSlots();
Long clusterKeySlot(final String key);
Long clusterCountKeysInSlot(final int slot);
String clusterSaveConfig();
String clusterReplicate(final String nodeId);
List<String> clusterSlaves(final String nodeId);
String clusterFailover();
List<Object> clusterSlots();
}

View File

@@ -61,6 +61,8 @@ public interface Commands {
public void incrBy(final String key, final long integer);
public void incrByFloat(final String key, final double value);
public void incr(final String key);
public void append(final String key, final String value);
@@ -79,6 +81,8 @@ public interface Commands {
public void hincrBy(final String key, final String field, final long value);
public void hincrByFloat(final String key, final String field, final double value);
public void hexists(final String key, final String field);
public void hdel(final String key, final String... fields);
@@ -297,38 +301,6 @@ public interface Commands {
public void bitop(BitOP op, final String destKey, String... srcKeys);
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void scan(int cursor, final ScanParams params);
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void hscan(final String key, int cursor, final ScanParams params);
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void sscan(final String key, int cursor, final ScanParams params);
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public void zscan(final String key, int cursor, final ScanParams params);
public void scan(final String cursor, final ScanParams params);
public void hscan(final String key, final String cursor, final ScanParams params);

View File

@@ -11,7 +11,6 @@ import java.util.List;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.RedisInputStream;
import redis.clients.util.RedisOutputStream;
import redis.clients.util.SafeEncoder;
@@ -22,9 +21,10 @@ public class Connection implements Closeable {
private Socket socket;
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private int pipelinedCommands = 0;
private int timeout = Protocol.DEFAULT_TIMEOUT;
private boolean broken = false;
public Socket getSocket() {
return socket;
}
@@ -45,7 +45,8 @@ public class Connection implements Closeable {
socket.setKeepAlive(true);
socket.setSoTimeout(0);
} catch (SocketException ex) {
throw new JedisException(ex);
broken = true;
throw new JedisConnectionException(ex);
}
}
@@ -54,7 +55,8 @@ public class Connection implements Closeable {
socket.setSoTimeout(timeout);
socket.setKeepAlive(false);
} catch (SocketException ex) {
throw new JedisException(ex);
broken = true;
throw new JedisConnectionException(ex);
}
}
@@ -63,14 +65,6 @@ public class Connection implements Closeable {
this.host = host;
}
protected void flush() {
try {
outputStream.flush();
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
protected Connection sendCommand(final Command cmd, final String... args) {
final byte[][] bargs = new byte[args.length][];
for (int i = 0; i < args.length; i++) {
@@ -80,17 +74,27 @@ public class Connection implements Closeable {
}
protected Connection sendCommand(final Command cmd, final byte[]... args) {
connect();
Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
try {
connect();
Protocol.sendCommand(outputStream, cmd, args);
return this;
} catch (JedisConnectionException ex) {
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}
protected Connection sendCommand(final Command cmd) {
connect();
Protocol.sendCommand(outputStream, cmd, new byte[0][]);
pipelinedCommands++;
return this;
try {
connect();
Protocol.sendCommand(outputStream, cmd, new byte[0][]);
return this;
} catch (JedisConnectionException ex) {
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}
public Connection(final String host, final int port) {
@@ -139,6 +143,7 @@ public class Connection implements Closeable {
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
@@ -147,7 +152,7 @@ public class Connection implements Closeable {
@Override
public void close() {
disconnect();
}
}
public void disconnect() {
if (isConnected()) {
@@ -158,6 +163,7 @@ public class Connection implements Closeable {
socket.close();
}
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
@@ -171,8 +177,7 @@ public class Connection implements Closeable {
protected String getStatusCodeReply() {
flush();
pipelinedCommands--;
final byte[] resp = (byte[]) Protocol.read(inputStream);
final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
if (null == resp) {
return null;
} else {
@@ -191,14 +196,12 @@ public class Connection implements Closeable {
public byte[] getBinaryBulkReply() {
flush();
pipelinedCommands--;
return (byte[]) Protocol.read(inputStream);
return (byte[]) readProtocolWithCheckingBroken();
}
public Long getIntegerReply() {
flush();
pipelinedCommands--;
return (Long) Protocol.read(inputStream);
return (Long) readProtocolWithCheckingBroken();
}
public List<String> getMultiBulkReply() {
@@ -208,53 +211,62 @@ public class Connection implements Closeable {
@SuppressWarnings("unchecked")
public List<byte[]> getBinaryMultiBulkReply() {
flush();
pipelinedCommands--;
return (List<byte[]>) Protocol.read(inputStream);
}
public void resetPipelinedCount() {
pipelinedCommands = 0;
return (List<byte[]>) readProtocolWithCheckingBroken();
}
@SuppressWarnings("unchecked")
public List<Object> getRawObjectMultiBulkReply() {
return (List<Object>) Protocol.read(inputStream);
return (List<Object>) readProtocolWithCheckingBroken();
}
public List<Object> getObjectMultiBulkReply() {
flush();
pipelinedCommands--;
return getRawObjectMultiBulkReply();
flush();
return getRawObjectMultiBulkReply();
}
@SuppressWarnings("unchecked")
public List<Long> getIntegerMultiBulkReply() {
flush();
pipelinedCommands--;
return (List<Long>) Protocol.read(inputStream);
}
public List<Object> getAll() {
return getAll(0);
}
public List<Object> getAll(int except) {
List<Object> all = new ArrayList<Object>();
flush();
while (pipelinedCommands > except) {
try {
all.add(Protocol.read(inputStream));
} catch (JedisDataException e) {
all.add(e);
}
pipelinedCommands--;
}
return all;
flush();
return (List<Long>) Protocol.read(inputStream);
}
public Object getOne() {
flush();
pipelinedCommands--;
return Protocol.read(inputStream);
return readProtocolWithCheckingBroken();
}
public boolean isBroken() {
return broken;
}
protected void flush() {
try {
outputStream.flush();
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
protected Object readProtocolWithCheckingBroken() {
try {
return Protocol.read(inputStream);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}
public List<Object> getMany(int count) {
flush();
List<Object> responses = new ArrayList<Object>();
for (int i = 0; i < count; i++) {
try {
responses.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) {
responses.add(e);
}
}
return responses;
}
}

View File

@@ -1,23 +1,20 @@
package redis.clients.jedis;
import java.net.URI;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import redis.clients.jedis.BinaryClient.LIST_POSITION;
import redis.clients.util.Pool;
import redis.clients.util.SafeEncoder;
import redis.clients.util.Slowlog;
import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
public class Jedis extends BinaryJedis implements JedisCommands,
MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands,
BasicCommands, ClusterCommands {
protected Pool<Jedis> dataSource = null;
public Jedis(final String host) {
super(host);
}
@@ -547,6 +544,31 @@ public class Jedis extends BinaryJedis implements JedisCommands,
return client.getIntegerReply();
}
/**
* INCRBYFLOAT
* <p>
* INCRBYFLOAT commands are limited to double precision floating point values.
* <p>
* Note: this is actually a string operation, that is, in Redis there are
* not "double" types. Simply the string stored at the key is parsed as a
* base double precision floating point value, incremented, and then
* converted back as a string. There is no DECRYBYFLOAT but providing a
* negative value will work as expected.
* <p>
* Time complexity: O(1)
*
* @param key
* @param value
* @return Double reply, this commands will reply with the new value of key
* after the increment.
*/
public Double incrByFloat(final String key, final double value) {
checkIsInMulti();
client.incrByFloat(key, value);
String dval = client.getBulkReply();
return (dval != null ? new Double(dval) : null);
}
/**
* Increment the number stored at key by one. If the key does not exist or
* contains a value of a wrong type, set the key to the value of "0" before
@@ -739,6 +761,32 @@ public class Jedis extends BinaryJedis implements JedisCommands,
return client.getIntegerReply();
}
/**
* Increment the number stored at field in the hash at key by a double
* precision floating point value. If key does not exist,
* a new key holding a hash is created. If field does not
* exist or holds a string, the value is set to 0 before applying the
* operation. Since the value argument is signed you can use this command to
* perform both increments and decrements.
* <p>
* The range of values supported by HINCRBYFLOAT is limited to
* double precision floating point values.
* <p>
* <b>Time complexity:</b> O(1)
*
* @param key
* @param field
* @param value
* @return Double precision floating point reply The new value at field after the increment
* operation.
*/
public Double hincrByFloat(final String key, final String field, final double value) {
checkIsInMulti();
client.hincrByFloat(key, field, value);
final String dval = client.getBulkReply();
return (dval != null ? new Double(dval) : null);
}
/**
* Test for existence of a specified field in a hash.
*
@@ -1068,8 +1116,8 @@ public class Jedis extends BinaryJedis implements JedisCommands,
/**
* Atomically return and remove the first (LPOP) or last (RPOP) element of
* the list. For example if the list contains the elements "a","b","c" LPOP
* will return "a" and the list will become "b","c".
* the list. For example if the list contains the elements "a","b","c" RPOP
* will return "c" and the list will become "a","b".
* <p>
* If the key does not exist or the list is already empty the special value
* 'nil' is returned.
@@ -2688,6 +2736,15 @@ public class Jedis extends BinaryJedis implements JedisCommands,
client.getrange(key, startOffset, endOffset);
return client.getBulkReply();
}
public Long bitpos(final String key, final boolean value) {
return bitpos(key, value, new BitPosParams());
}
public Long bitpos(final String key, final boolean value, final BitPosParams params) {
client.bitpos(key, value, params);
return client.getIntegerReply();
}
/**
* Retrieve the configuration of a running Redis server. Not all the
@@ -3089,7 +3146,7 @@ public class Jedis extends BinaryJedis implements JedisCommands,
return client.getStatusCodeReply();
}
public Long pexpire(final String key, final int milliseconds) {
public Long pexpire(final String key, final long milliseconds) {
checkIsInMulti();
client.pexpire(key, milliseconds);
return client.getIntegerReply();
@@ -3107,12 +3164,6 @@ public class Jedis extends BinaryJedis implements JedisCommands,
return client.getIntegerReply();
}
public Double incrByFloat(final String key, final double increment) {
checkIsInMulti();
client.incrByFloat(key, increment);
String relpy = client.getBulkReply();
return (relpy != null ? new Double(relpy) : null);
}
public String psetex(final String key, final int milliseconds,
final String value) {
@@ -3153,143 +3204,10 @@ public class Jedis extends BinaryJedis implements JedisCommands,
return client.getStatusCodeReply();
}
public Double hincrByFloat(final String key, final String field,
double increment) {
checkIsInMulti();
client.hincrByFloat(key, field, increment);
String relpy = client.getBulkReply();
return (relpy != null ? new Double(relpy) : null);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<String> scan(int cursor) {
return scan(cursor, new ScanParams());
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<String> scan(int cursor, final ScanParams params) {
checkIsInMulti();
client.scan(cursor, params);
List<Object> result = client.getObjectMultiBulkReply();
int newcursor = Integer.parseInt(new String((byte[]) result.get(0)));
List<String> results = new ArrayList<String>();
List<byte[]> rawResults = (List<byte[]>) result.get(1);
for (byte[] bs : rawResults) {
results.add(SafeEncoder.encode(bs));
}
return new ScanResult<String>(newcursor, results);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<Map.Entry<String, String>> hscan(final String key,
int cursor) {
return hscan(key, cursor, new ScanParams());
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<Map.Entry<String, String>> hscan(final String key,
int cursor, final ScanParams params) {
checkIsInMulti();
client.hscan(key, cursor, params);
List<Object> result = client.getObjectMultiBulkReply();
int newcursor = Integer.parseInt(new String((byte[]) result.get(0)));
List<Map.Entry<String, String>> results = new ArrayList<Map.Entry<String, String>>();
List<byte[]> rawResults = (List<byte[]>) result.get(1);
Iterator<byte[]> iterator = rawResults.iterator();
while (iterator.hasNext()) {
results.add(new AbstractMap.SimpleEntry<String, String>(SafeEncoder
.encode(iterator.next()), SafeEncoder.encode(iterator
.next())));
}
return new ScanResult<Map.Entry<String, String>>(newcursor, results);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<String> sscan(final String key, int cursor) {
return sscan(key, cursor, new ScanParams());
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<String> sscan(final String key, int cursor,
final ScanParams params) {
checkIsInMulti();
client.sscan(key, cursor, params);
List<Object> result = client.getObjectMultiBulkReply();
int newcursor = Integer.parseInt(new String((byte[]) result.get(0)));
List<String> results = new ArrayList<String>();
List<byte[]> rawResults = (List<byte[]>) result.get(1);
for (byte[] bs : rawResults) {
results.add(SafeEncoder.encode(bs));
}
return new ScanResult<String>(newcursor, results);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<Tuple> zscan(final String key, int cursor) {
return zscan(key, cursor, new ScanParams());
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<Tuple> zscan(final String key, int cursor,
final ScanParams params) {
checkIsInMulti();
client.zscan(key, cursor, params);
List<Object> result = client.getObjectMultiBulkReply();
int newcursor = Integer.parseInt(new String((byte[]) result.get(0)));
List<Tuple> results = new ArrayList<Tuple>();
List<byte[]> rawResults = (List<byte[]>) result.get(1);
Iterator<byte[]> iterator = rawResults.iterator();
while (iterator.hasNext()) {
results.add(new Tuple(SafeEncoder.encode(iterator.next()), Double
.valueOf(SafeEncoder.encode(iterator.next()))));
}
return new ScanResult<Tuple>(newcursor, results);
}
public ScanResult<String> scan(final String cursor) {
return scan(cursor, new ScanParams());
}
public ScanResult<String> scan(final String cursor, final ScanParams params) {
checkIsInMulti();
client.scan(cursor, params);
@@ -3302,12 +3220,12 @@ public class Jedis extends BinaryJedis implements JedisCommands,
}
return new ScanResult<String>(newcursor, results);
}
public ScanResult<Map.Entry<String, String>> hscan(final String key,
final String cursor) {
return hscan(key, cursor, new ScanParams());
}
public ScanResult<Map.Entry<String, String>> hscan(final String key,
final String cursor, final ScanParams params) {
checkIsInMulti();
@@ -3320,15 +3238,15 @@ public class Jedis extends BinaryJedis implements JedisCommands,
while (iterator.hasNext()) {
results.add(new AbstractMap.SimpleEntry<String, String>(SafeEncoder
.encode(iterator.next()), SafeEncoder.encode(iterator
.next())));
.next())));
}
return new ScanResult<Map.Entry<String, String>>(newcursor, results);
}
public ScanResult<String> sscan(final String key, final String cursor) {
return sscan(key, cursor, new ScanParams());
}
public ScanResult<String> sscan(final String key, final String cursor,
final ScanParams params) {
checkIsInMulti();
@@ -3342,11 +3260,11 @@ public class Jedis extends BinaryJedis implements JedisCommands,
}
return new ScanResult<String>(newcursor, results);
}
public ScanResult<Tuple> zscan(final String key, final String cursor) {
return zscan(key, cursor, new ScanParams());
}
public ScanResult<Tuple> zscan(final String key, final String cursor,
final ScanParams params) {
checkIsInMulti();
@@ -3416,6 +3334,67 @@ public class Jedis extends BinaryJedis implements JedisCommands,
client.clusterSetSlotImporting(slot, nodeId);
return client.getStatusCodeReply();
}
public String clusterSetSlotStable(final int slot) {
checkIsInMulti();
client.clusterSetSlotStable(slot);
return client.getStatusCodeReply();
}
public String clusterForget(final String nodeId) {
checkIsInMulti();
client.clusterForget(nodeId);
return client.getStatusCodeReply();
}
public String clusterFlushSlots() {
checkIsInMulti();
client.clusterFlushSlots();
return client.getStatusCodeReply();
}
public Long clusterKeySlot(final String key) {
checkIsInMulti();
client.clusterKeySlot(key);
return client.getIntegerReply();
}
public Long clusterCountKeysInSlot(final int slot) {
checkIsInMulti();
client.clusterCountKeysInSlot(slot);
return client.getIntegerReply();
}
public String clusterSaveConfig() {
checkIsInMulti();
client.clusterSaveConfig();
return client.getStatusCodeReply();
}
public String clusterReplicate(final String nodeId) {
checkIsInMulti();
client.clusterReplicate(nodeId);
return client.getStatusCodeReply();
}
public List<String> clusterSlaves(final String nodeId) {
checkIsInMulti();
client.clusterSlaves(nodeId);
return client.getMultiBulkReply();
}
public String clusterFailover() {
checkIsInMulti();
client.clusterFailover();
return client.getStatusCodeReply();
}
@Override
public List<Object> clusterSlots() {
checkIsInMulti();
client.clusterSlots();
return client.getObjectMultiBulkReply();
}
public String asking() {
checkIsInMulti();
@@ -3438,8 +3417,50 @@ public class Jedis extends BinaryJedis implements JedisCommands,
public Map<String, String> pubsubNumSub(String... channels) {
checkIsInMulti();
client.pubsubNumSub(channels);
return BuilderFactory.STRING_MAP
return BuilderFactory.PUBSUB_NUMSUB_MAP
.build(client.getBinaryMultiBulkReply());
}
@Override
public void close() {
if (dataSource != null) {
if (client.isBroken()) {
this.dataSource.returnBrokenResource(this);
} else {
this.dataSource.returnResource(this);
}
} else {
client.close();
}
}
public void setDataSource(Pool<Jedis> jedisPool) {
this.dataSource = jedisPool;
}
public Long pfadd(final String key, final String... elements) {
checkIsInMulti();
client.pfadd(key, elements);
return client.getIntegerReply();
}
public long pfcount(final String key) {
checkIsInMulti();
client.pfcount(key);
return client.getIntegerReply();
}
@Override
public long pfcount(String... keys) {
checkIsInMulti();
client.pfcount(keys);
return client.getIntegerReply();
}
public String pfmerge(final String destkey, final String... sourcekeys) {
checkIsInMulti();
client.pfmerge(destkey, sourcekeys);
return client.getStatusCodeReply();
}
}

View File

@@ -1,13 +1,14 @@
package redis.clients.jedis;
import redis.clients.jedis.BinaryClient.LIST_POSITION;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import redis.clients.jedis.BinaryClient.LIST_POSITION;
public class JedisCluster implements JedisCommands, BasicCommands {
public class JedisCluster implements JedisCommands, BasicCommands, Closeable {
public static final short HASHSLOTS = 16384;
private static final int DEFAULT_TIMEOUT = 1;
private static final int DEFAULT_MAX_REDIRECTIONS = 5;
@@ -32,6 +33,21 @@ public class JedisCluster implements JedisCommands, BasicCommands {
this.timeout = timeout;
this.maxRedirections = maxRedirections;
}
@Override
public void close() {
if (connectionHandler != null) {
for (JedisPool pool : connectionHandler.getNodes().values()) {
try {
if (pool != null) {
pool.destroy();
}
} catch (Exception e) {
// pass
}
}
}
}
@Override
public String set(final String key, final String value) {
@@ -44,6 +60,18 @@ public class JedisCluster implements JedisCommands, BasicCommands {
}.run(key);
}
@Override
public String set(final String key, final String value, final String nxxx,
final String expx, final long time) {
return new JedisClusterCommand<String>(connectionHandler, timeout,
maxRedirections) {
@Override
public String execute(Jedis connection) {
return connection.set(key, value, nxxx, expx, time);
}
}.run(key);
}
@Override
public String get(final String key) {
return new JedisClusterCommand<String>(connectionHandler, timeout,
@@ -382,7 +410,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.hdel(key);
return connection.hlen(key);
}
}.run(key);
}
@@ -611,6 +639,17 @@ public class JedisCluster implements JedisCommands, BasicCommands {
}.run(key);
}
@Override
public List<String> srandmember(final String key, final int count) {
return new JedisClusterCommand<List<String>>(connectionHandler, timeout,
maxRedirections) {
@Override
public List<String> execute(Jedis connection) {
return connection.srandmember(key, count);
}
}.run(key);
}
@Override
public Long strlen(final String key) {
return new JedisClusterCommand<Long>(connectionHandler, timeout,
@@ -1441,58 +1480,6 @@ public class JedisCluster implements JedisCommands, BasicCommands {
return null;
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
@Override
public ScanResult<Entry<String, String>> hscan(final String key,
final int cursor) {
return new JedisClusterCommand<ScanResult<Entry<String, String>>>(
connectionHandler, timeout, maxRedirections) {
@Override
public ScanResult<Entry<String, String>> execute(Jedis connection) {
return connection.hscan(key, cursor);
}
}.run(null);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
@Override
public ScanResult<String> sscan(final String key, final int cursor) {
return new JedisClusterCommand<ScanResult<String>>(connectionHandler,
timeout, maxRedirections) {
@Override
public ScanResult<String> execute(Jedis connection) {
return connection.sscan(key, cursor);
}
}.run(null);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
@Override
public ScanResult<Tuple> zscan(final String key, final int cursor) {
return new JedisClusterCommand<ScanResult<Tuple>>(connectionHandler,
timeout, maxRedirections) {
@Override
public ScanResult<Tuple> execute(Jedis connection) {
return connection.zscan(key, cursor);
}
}.run(null);
}
@Override
public ScanResult<Entry<String, String>> hscan(final String key,
final String cursor) {
@@ -1527,4 +1514,26 @@ public class JedisCluster implements JedisCommands, BasicCommands {
}.run(null);
}
@Override
public Long pfadd(final String key, final String... elements) {
return new JedisClusterCommand<Long>(connectionHandler,
timeout, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.pfadd(key, elements);
}
}.run(key);
}
@Override
public long pfcount(final String key) {
return new JedisClusterCommand<Long>(connectionHandler,
timeout, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.pfcount(key);
}
}.run(key);
}
}

View File

@@ -4,7 +4,6 @@ import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;

View File

@@ -1,13 +1,13 @@
package redis.clients.jedis;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.*;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.ClusterNodeInformation;
import redis.clients.util.ClusterNodeInformationParser;
public abstract class JedisClusterConnectionHandler {
public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
@@ -65,69 +65,40 @@ public abstract class JedisClusterConnectionHandler {
setNodeIfNotExist(node);
}
}
private void discoverClusterNodesAndSlots(Jedis jedis) {
String localNodes = jedis.clusterNodes();
for (String nodeInfo : localNodes.split("\n")) {
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis);
setNodeIfNotExist(node);
JedisPool nodePool = nodes.get(getNodeKey(node));
populateNodeSlots(nodeInfo, nodePool);
}
}
private void setNodeIfNotExist(HostAndPort node) {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey))
return;
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
nodes.put(nodeKey, nodePool);
}
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
jedis.getClient().getPort()));
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
String[] nodeInfoArray = nodeInfo.split(" ");
if (nodeInfoArray.length > 7) {
for (int i = 8; i < nodeInfoArray.length; i++) {
processSlot(nodeInfoArray[i], nodePool);
}
HostAndPort targetNode = clusterNodeInfo.getNode();
setNodeIfNotExist(targetNode);
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
}
}
private void processSlot(String slot, JedisPool nodePool) {
if (slot.contains("-")) {
String[] slotRange = slot.split("-");
for (int i = Integer.valueOf(slotRange[0]); i <= Integer
.valueOf(slotRange[1]); i++) {
slots.put(i, nodePool);
}
} else {
slots.put(Integer.valueOf(slot), nodePool);
}
}
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo,
Jedis currentConnection) {
String stringHostAndPort = nodeInfo.split(" ", 3)[1];
if (":0".equals(stringHostAndPort)) {
return new HostAndPort(currentConnection.getClient().getHost(),
currentConnection.getClient().getPort());
}
String[] arrayHostAndPort = stringHostAndPort.split(":");
return new HostAndPort(arrayHostAndPort[0],
Integer.valueOf(arrayHostAndPort[1]));
}
public void assignSlotToNode(int slot, HostAndPort targetNode) {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool != null) {
slots.put(slot, targetPool);
} else {
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
slots.put(slot, targetPool);
}
public void assignSlotsToNode(List<Integer> targetSlots,
HostAndPort targetNode) {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
}
@@ -144,4 +115,13 @@ public abstract class JedisClusterConnectionHandler {
protected String getNodeKey(Client client) {
return client.getHost() + ":" + client.getPort();
}
private void setNodeIfNotExist(HostAndPort node) {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey))
return;
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
nodes.put(nodeKey, nodePool);
}
}

View File

@@ -10,6 +10,9 @@ import java.util.Set;
public interface JedisCommands {
String set(String key, String value);
String set(String key, String value, String nxxx,
String expx, long time);
String get(String key);
Boolean exists(String key);
@@ -110,6 +113,8 @@ public interface JedisCommands {
String srandmember(String key);
List<String> srandmember(String key, int count);
Long strlen(String key);
Long zadd(String key, double score, String member);
@@ -220,33 +225,14 @@ public interface JedisCommands {
Long bitcount(final String key, long start, long end);
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
ScanResult<Map.Entry<String, String>> hscan(final String key, int cursor);
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
ScanResult<String> sscan(final String key, int cursor);
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
ScanResult<Tuple> zscan(final String key, int cursor);
ScanResult<Map.Entry<String, String>> hscan(final String key, final String cursor);
ScanResult<String> sscan(final String key, final String cursor);
ScanResult<Tuple> zscan(final String key, final String cursor);
Long pfadd(final String key, final String... elements);
long pfcount(final String key);
}

View File

@@ -79,6 +79,13 @@ public class JedisPool extends Pool<Jedis> {
database, clientName));
}
@Override
public Jedis getResource() {
Jedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
public void returnBrokenResource(final Jedis resource) {
if (resource != null) {
returnBrokenResourceObject(resource);
@@ -91,4 +98,12 @@ public class JedisPool extends Pool<Jedis> {
returnResourceObject(resource);
}
}
public int getNumActive() {
if (this.internalPool == null || this.internalPool.isClosed()) {
return -1;
}
return this.internalPool.getNumActive();
}
}

View File

@@ -162,12 +162,6 @@ public abstract class JedisPubSub {
/* Invalidate instance since this thread is no longer listening */
this.client = null;
/*
* Reset pipeline count because subscribe() calls would have increased
* it but nothing decremented it.
*/
client.resetPipelinedCount();
}
public int getSubscribedChannels() {

View File

@@ -74,19 +74,6 @@ public class JedisSentinelPool extends Pool<Jedis> {
initPool(master);
}
public void returnBrokenResource(final Jedis resource) {
if (resource != null) {
returnBrokenResourceObject(resource);
}
}
public void returnResource(final Jedis resource) {
if (resource != null) {
resource.resetState();
returnResourceObject(resource);
}
}
private volatile HostAndPort currentHostMaster;
public void destroy() {
@@ -175,6 +162,26 @@ public class JedisSentinelPool extends Pool<Jedis> {
return new HostAndPort(host, port);
}
@Override
public Jedis getResource() {
Jedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
public void returnBrokenResource(final Jedis resource) {
if (resource != null) {
returnBrokenResourceObject(resource);
}
}
public void returnResource(final Jedis resource) {
if (resource != null) {
resource.resetState();
returnResourceObject(resource);
}
}
protected class JedisPubSubAdapter extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {

View File

@@ -69,4 +69,8 @@ public interface MultiKeyBinaryCommands {
byte[] randomBinaryKey();
Long bitop(BitOP op, final byte[] destKey, byte[]... srcKeys);
String pfmerge(final byte[] destkey, final byte[]... sourcekeys);
Long pfcount(byte[]... keys);
}

View File

@@ -65,4 +65,8 @@ public interface MultiKeyBinaryRedisPipeline {
Response<byte[]> randomKeyBinary();
Response<Long> bitop(BitOP op, final byte[] destKey, byte[]... srcKeys);
Response<String> pfmerge(final byte[] destkey, final byte[]... sourcekeys);
Response<Long> pfcount(final byte[] ... keys);
}

View File

@@ -70,13 +70,9 @@ public interface MultiKeyCommands {
Long bitop(BitOP op, final String destKey, String... srcKeys);
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
ScanResult<String> scan(int cursor);
ScanResult<String> scan(final String cursor);
String pfmerge(final String destkey, final String... sourcekeys);
long pfcount(final String...keys);
}

View File

@@ -64,4 +64,8 @@ public interface MultiKeyCommandsPipeline {
Response<String> randomKey();
Response<Long> bitop(BitOP op, final String destKey, String... srcKeys);
Response<String> pfmerge(final String destkey, final String... sourcekeys);
Response<Long> pfcount(final String...keys);
}

View File

@@ -367,6 +367,11 @@ abstract class MultiKeyPipelineBase extends PipelineBase implements
client.info();
return getResponse(BuilderFactory.STRING);
}
public Response<List<String>> time() {
client.time();
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<Long> dbSize() {
client.dbSize();
@@ -446,4 +451,28 @@ abstract class MultiKeyPipelineBase extends PipelineBase implements
client.clusterSetSlotImporting(slot, nodeId);
return getResponse(BuilderFactory.STRING);
}
@Override
public Response<String> pfmerge(byte[] destkey, byte[]... sourcekeys) {
client.pfmerge(destkey, sourcekeys);
return getResponse(BuilderFactory.STRING);
}
@Override
public Response<String> pfmerge(String destkey, String... sourcekeys) {
client.pfmerge(destkey, sourcekeys);
return getResponse(BuilderFactory.STRING);
}
@Override
public Response<Long> pfcount(String...keys) {
client.pfcount(keys);
return getResponse(BuilderFactory.LONG);
}
@Override
public Response<Long> pfcount(final byte[] ... keys) {
client.pfcount(keys);
return getResponse(BuilderFactory.LONG);
}
}

View File

@@ -69,13 +69,25 @@ public class Pipeline extends MultiKeyPipelineBase {
return client;
}
public void clear() {
if (isInMulti()) {
discard();
}
sync();
}
public boolean isInMulti() {
return currentMulti != null;
}
/**
* Syncronize pipeline by reading all responses. This operation close the
* pipeline. In order to get return values from pipelined commands, capture
* the different Response<?> of the commands you execute.
*/
public void sync() {
List<Object> unformatted = client.getAll();
List<Object> unformatted = client.getMany(getPipelinedResponseLength());
for (Object o : unformatted) {
generateResponse(o);
}
@@ -90,7 +102,7 @@ public class Pipeline extends MultiKeyPipelineBase {
* @return A list of all the responses in the order you executed them.
*/
public List<Object> syncAndReturnAll() {
List<Object> unformatted = client.getAll();
List<Object> unformatted = client.getMany(getPipelinedResponseLength());
List<Object> formatted = new ArrayList<Object>();
for (Object o : unformatted) {
@@ -104,12 +116,17 @@ public class Pipeline extends MultiKeyPipelineBase {
}
public Response<String> discard() {
if (currentMulti == null)
throw new JedisDataException("DISCARD without MULTI");
client.discard();
currentMulti = null;
return getResponse(BuilderFactory.STRING);
}
public Response<List<Object>> exec() {
if (currentMulti == null)
throw new JedisDataException("EXEC without MULTI");
client.exec();
Response<List<Object>> response = super.getResponse(currentMulti);
currentMulti.setResponseDependency(response);
@@ -118,11 +135,14 @@ public class Pipeline extends MultiKeyPipelineBase {
}
public Response<String> multi() {
if (currentMulti != null)
throw new JedisDataException("MULTI calls can not be nested");
client.multi();
Response<String> response = getResponse(BuilderFactory.STRING); // Expecting
// OK
currentMulti = new MultiResponseBuilder();
return response;
}
}

View File

@@ -142,7 +142,25 @@ abstract class PipelineBase extends Queable implements BinaryRedisPipeline,
getClient(key).getbit(key, offset);
return getResponse(BuilderFactory.BOOLEAN);
}
public Response<Long> bitpos(final String key, final boolean value) {
return bitpos(key, value, new BitPosParams());
}
public Response<Long> bitpos(final String key, final boolean value, final BitPosParams params) {
getClient(key).bitpos(key, value, params);
return getResponse(BuilderFactory.LONG);
}
public Response<Long> bitpos(final byte[] key, final boolean value) {
return bitpos(key, value, new BitPosParams());
}
public Response<Long> bitpos(final byte[] key, final boolean value, final BitPosParams params) {
getClient(key).bitpos(key, value, params);
return getResponse(BuilderFactory.LONG);
}
public Response<String> getrange(String key, long startOffset,
long endOffset) {
getClient(key).getrange(key, startOffset, endOffset);
@@ -159,9 +177,9 @@ abstract class PipelineBase extends Queable implements BinaryRedisPipeline,
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<Long> getrange(byte[] key, long startOffset, long endOffset) {
public Response<byte[]> getrange(byte[] key, long startOffset, long endOffset) {
getClient(key).getrange(key, startOffset, endOffset);
return getResponse(BuilderFactory.LONG);
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<Long> hdel(String key, String... field) {
@@ -1130,12 +1148,12 @@ abstract class PipelineBase extends Queable implements BinaryRedisPipeline,
return getResponse(BuilderFactory.LONG);
}
public Response<Long> pexpire(String key, int milliseconds) {
public Response<Long> pexpire(String key, long milliseconds) {
getClient(key).pexpire(key, milliseconds);
return getResponse(BuilderFactory.LONG);
}
public Response<Long> pexpire(byte[] key, int milliseconds) {
public Response<Long> pexpire(byte[] key, long milliseconds) {
getClient(key).pexpire(key, milliseconds);
return getResponse(BuilderFactory.LONG);
}
@@ -1254,4 +1272,28 @@ abstract class PipelineBase extends Queable implements BinaryRedisPipeline,
return getResponse(BuilderFactory.STRING);
}
@Override
public Response<Long> pfadd(byte[] key, byte[]... elements) {
getClient(key).pfadd(key, elements);
return getResponse(BuilderFactory.LONG);
}
@Override
public Response<Long> pfcount(byte[] key) {
getClient(key).pfcount(key);
return getResponse(BuilderFactory.LONG);
}
@Override
public Response<Long> pfadd(String key, String... elements) {
getClient(key).pfadd(key, elements);
return getResponse(BuilderFactory.LONG);
}
@Override
public Response<Long> pfcount(String key) {
getClient(key).pfcount(key);
return getResponse(BuilderFactory.LONG);
}
}

View File

@@ -7,5 +7,9 @@ package redis.clients.jedis;
* @see https://github.com/xetorthio/jedis/pull/498
*/
public abstract class PipelineBlock extends Pipeline {
// For shadowing
@SuppressWarnings("unused")
private Client client;
public abstract void execute();
}

View File

@@ -50,6 +50,16 @@ public final class Protocol {
public static final String CLUSTER_SETSLOT_NODE = "node";
public static final String CLUSTER_SETSLOT_MIGRATING = "migrating";
public static final String CLUSTER_SETSLOT_IMPORTING = "importing";
public static final String CLUSTER_SETSLOT_STABLE = "stable";
public static final String CLUSTER_FORGET = "forget";
public static final String CLUSTER_FLUSHSLOT = "flushslots";
public static final String CLUSTER_KEYSLOT = "keyslot";
public static final String CLUSTER_COUNTKEYINSLOT = "countkeysinslot";
public static final String CLUSTER_SAVECONFIG = "saveconfig";
public static final String CLUSTER_REPLICATE = "replicate";
public static final String CLUSTER_SLAVES = "slaves";
public static final String CLUSTER_FAILOVER = "failover";
public static final String CLUSTER_SLOTS = "slots";
public static final String PUBSUB_CHANNELS= "channels";
public static final String PUBSUB_NUMSUB = "numsub";
public static final String PUBSUB_NUM_PAT = "numpat";
@@ -208,7 +218,7 @@ public final class Protocol {
}
public static enum Command {
PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, ZLEXCOUNT, ZRANGEBYLEX, ZREMRANGEBYLEX, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING;
PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, ZLEXCOUNT, ZRANGEBYLEX, ZREMRANGEBYLEX, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, BITPOS, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING, PFADD, PFCOUNT, PFMERGE;
public final byte[] raw;

View File

@@ -24,4 +24,11 @@ public class Queable {
return lr;
}
protected boolean hasPipelinedResponse() {
return pipelinedResponses.size() > 0;
}
protected int getPipelinedResponseLength() {
return pipelinedResponses.size();
}
}

View File

@@ -197,4 +197,8 @@ public interface RedisPipeline {
Response<Long> bitcount(String key);
Response<Long> bitcount(String key, long start, long end);
Response<Long> pfadd(final String key, final String... elements);
Response<Long> pfcount(final String key);
}

View File

@@ -13,15 +13,24 @@ import redis.clients.util.SafeEncoder;
public class ScanParams {
private List<byte[]> params = new ArrayList<byte[]>();
public final static String SCAN_POINTER_START = String.valueOf(0);
public final static byte[] SCAN_POINTER_START_BINARY = SafeEncoder.encode(SCAN_POINTER_START);
public void match(final String pattern) {
public ScanParams match(final byte[] pattern) {
params.add(MATCH.raw);
params.add(pattern);
return this;
}
public ScanParams match(final String pattern) {
params.add(MATCH.raw);
params.add(SafeEncoder.encode(pattern));
return this;
}
public void count(final int count) {
public ScanParams count(final int count) {
params.add(COUNT.raw);
params.add(Protocol.toByteArray(count));
return this;
}
public Collection<byte[]> getParams() {

View File

@@ -2,41 +2,26 @@ package redis.clients.jedis;
import java.util.List;
import redis.clients.util.SafeEncoder;
public class ScanResult<T> {
private String cursor;
private byte[] cursor;
private List<T> results;
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult(int cursor, List<T> results) {
this.cursor = String.valueOf(cursor);
this.results = results;
public ScanResult(String cursor, List<T> results) {
this(SafeEncoder.encode(cursor), results);
}
public ScanResult(String cursor, List<T> results) {
public ScanResult(byte[] cursor, List<T> results) {
this.cursor = cursor;
this.results = results;
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
* @return int(currently), but will be changed to String, so be careful to prepare!
*/
public int getCursor() {
return Integer.parseInt(cursor);
public String getCursor() {
return SafeEncoder.encode(cursor);
}
/**
* FIXME: This method should be changed to getCursor() on next major release
*/
public String getStringCursor() {
public byte[] getCursorAsBytes() {
return cursor;
}

View File

@@ -1,15 +1,22 @@
package redis.clients.jedis;
import java.io.Closeable;
import redis.clients.jedis.BinaryClient.LIST_POSITION;
import redis.clients.util.Hashing;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;
import redis.clients.jedis.BinaryClient.LIST_POSITION;
import redis.clients.util.Hashing;
import redis.clients.util.Pool;
public class ShardedJedis extends BinaryShardedJedis implements JedisCommands,
Closeable {
protected Pool<ShardedJedis> dataSource = null;
public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
public ShardedJedis(List<JedisShardInfo> shards) {
super(shards);
}
@@ -32,6 +39,13 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
return j.set(key, value);
}
@Override
public String set(String key, String value, String nxxx, String expx,
long time) {
Jedis j = getShard(key);
return j.set(key, value, nxxx, expx, time);
}
public String get(String key) {
Jedis j = getShard(key);
return j.get(key);
@@ -132,6 +146,11 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
return j.incrBy(key, integer);
}
public Double incrByFloat(String key, double integer) {
Jedis j = getShard(key);
return j.incrByFloat(key, integer);
}
public Long incr(String key) {
Jedis j = getShard(key);
return j.incr(key);
@@ -177,6 +196,11 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
return j.hincrBy(key, field, value);
}
public Double hincrByFloat(String key, String field, double value) {
Jedis j = getShard(key);
return j.hincrByFloat(key, field, value);
}
public Boolean hexists(String key, String field) {
Jedis j = getShard(key);
return j.hexists(key, field);
@@ -322,6 +346,12 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
return j.srandmember(key);
}
@Override
public List<String> srandmember(String key, int count) {
Jedis j = getShard(key);
return j.srandmember(key, count);
}
public Long zadd(String key, double score, String member) {
Jedis j = getShard(key);
return j.zadd(key, score, member);
@@ -544,52 +574,63 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
return j.bitcount(key, start, end);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<Entry<String, String>> hscan(String key, int cursor) {
Jedis j = getShard(key);
return j.hscan(key, cursor);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<String> sscan(String key, int cursor) {
Jedis j = getShard(key);
return j.sscan(key, cursor);
}
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
* And will be removed on next major release
* @see https://github.com/xetorthio/jedis/issues/531
*/
public ScanResult<Tuple> zscan(String key, int cursor) {
Jedis j = getShard(key);
return j.zscan(key, cursor);
}
public ScanResult<Entry<String, String>> hscan(String key, final String cursor) {
Jedis j = getShard(key);
return j.hscan(key, cursor);
}
public ScanResult<String> sscan(String key, final String cursor) {
Jedis j = getShard(key);
return j.sscan(key, cursor);
}
public ScanResult<Tuple> zscan(String key, final String cursor) {
Jedis j = getShard(key);
return j.zscan(key, cursor);
}
@Override
public void close() {
if (dataSource != null) {
boolean broken = false;
for (Jedis jedis : getAllShards()) {
if (jedis.getClient().isBroken()) {
broken = true;
}
}
if (broken) {
dataSource.returnBrokenResource(this);
} else {
this.resetState();
dataSource.returnResource(this);
}
} else {
disconnect();
}
}
public void setDataSource(Pool<ShardedJedis> shardedJedisPool) {
this.dataSource = shardedJedisPool;
}
public void resetState() {
for (Jedis jedis : getAllShards()) {
jedis.resetState();
}
}
public Long pfadd(String key, String... elements) {
Jedis j = getShard(key);
return j.pfadd(key, elements);
}
@Override
public long pfcount(String key) {
Jedis j = getShard(key);
return j.pfcount(key);
}
}

View File

@@ -32,6 +32,28 @@ public class ShardedJedisPool extends Pool<ShardedJedis> {
super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
}
@Override
public ShardedJedis getResource() {
ShardedJedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
@Override
public void returnBrokenResource(final ShardedJedis resource) {
if (resource != null) {
returnBrokenResourceObject(resource);
}
}
@Override
public void returnResource(final ShardedJedis resource) {
if (resource != null) {
resource.resetState();
returnResourceObject(resource);
}
}
/**
* PoolableObjectFactory custom impl.
*/

View File

@@ -31,9 +31,16 @@ public class Transaction extends MultiKeyPipelineBase {
return client;
}
public void clear() {
if (inTransaction) {
discard();
}
}
public List<Object> exec() {
// Discard QUEUED or ERROR
client.getMany(getPipelinedResponseLength());
client.exec();
client.getAll(1); // Discard all but the last reply
List<Object> unformatted = client.getObjectMultiBulkReply();
if (unformatted == null) {
@@ -51,8 +58,9 @@ public class Transaction extends MultiKeyPipelineBase {
}
public List<Response<?>> execGetResponse() {
// Discard QUEUED or ERROR
client.getMany(getPipelinedResponseLength());
client.exec();
client.getAll(1); // Discard all but the last reply
List<Object> unformatted = client.getObjectMultiBulkReply();
if (unformatted == null) {
@@ -66,11 +74,15 @@ public class Transaction extends MultiKeyPipelineBase {
}
public String discard() {
client.getMany(getPipelinedResponseLength());
client.discard();
client.getAll(1); // Discard all but the last reply
inTransaction = false;
clean();
return client.getStatusCodeReply();
}
public void setClient(Client client) {
this.client = client;
}
}

View File

@@ -9,6 +9,10 @@ import redis.clients.jedis.exceptions.JedisException;
* @see https://github.com/xetorthio/jedis/pull/498
*/
public abstract class TransactionBlock extends Transaction {
// For shadowing
@SuppressWarnings("unused")
private Client client;
public TransactionBlock(Client client) {
super(client);
}
@@ -19,6 +23,6 @@ public abstract class TransactionBlock extends Transaction {
public abstract void execute() throws JedisException;
public void setClient(Client client) {
this.client = client;
super.setClient(client);
}
}

View File

@@ -23,9 +23,15 @@ public class ZParams {
private List<byte[]> params = new ArrayList<byte[]>();
public ZParams weights(final int... weights) {
/**
* Set weights.
*
* @param weights
* weights.
*/
public ZParams weights(final double... weights) {
params.add(WEIGHTS.raw);
for (final int weight : weights) {
for (final double weight : weights) {
params.add(Protocol.toByteArray(weight));
}

View File

@@ -0,0 +1,48 @@
package redis.clients.util;
import redis.clients.jedis.HostAndPort;
import java.util.ArrayList;
import java.util.List;
public class ClusterNodeInformation {
private HostAndPort node;
private List<Integer> availableSlots;
private List<Integer> slotsBeingImported;
private List<Integer> slotsBeingMigrated;
public ClusterNodeInformation(HostAndPort node) {
this.node = node;
this.availableSlots = new ArrayList<Integer>();
this.slotsBeingImported = new ArrayList<Integer>();
this.slotsBeingMigrated = new ArrayList<Integer>();
}
public void addAvailableSlot(int slot) {
availableSlots.add(slot);
}
public void addSlotBeingImported(int slot) {
slotsBeingImported.add(slot);
}
public void addSlotBeingMigrated(int slot) {
slotsBeingMigrated.add(slot);
}
public HostAndPort getNode() {
return node;
}
public List<Integer> getAvailableSlots() {
return availableSlots;
}
public List<Integer> getSlotsBeingImported() {
return slotsBeingImported;
}
public List<Integer> getSlotsBeingMigrated() {
return slotsBeingMigrated;
}
}

View File

@@ -0,0 +1,80 @@
package redis.clients.util;
import redis.clients.jedis.HostAndPort;
public class ClusterNodeInformationParser {
private static final String SLOT_IMPORT_IDENTIFIER = "-<-";
private static final String SLOT_IN_TRANSITION_IDENTIFIER = "[";
public static final int SLOT_INFORMATIONS_START_INDEX = 8;
public static final int HOST_AND_PORT_INDEX = 1;
public ClusterNodeInformation parse(String nodeInfo, HostAndPort current) {
String[] nodeInfoPartArray = nodeInfo.split(" ");
HostAndPort node = getHostAndPortFromNodeLine(nodeInfoPartArray,
current);
ClusterNodeInformation info = new ClusterNodeInformation(node);
if (nodeInfoPartArray.length >= SLOT_INFORMATIONS_START_INDEX) {
String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
fillSlotInformation(slotInfoPartArray, info);
}
return info;
}
private String[] extractSlotParts(String[] nodeInfoPartArray) {
String[] slotInfoPartArray = new String[nodeInfoPartArray.length
- SLOT_INFORMATIONS_START_INDEX];
for (int i = SLOT_INFORMATIONS_START_INDEX; i < nodeInfoPartArray.length; i++) {
slotInfoPartArray[i - SLOT_INFORMATIONS_START_INDEX] = nodeInfoPartArray[i];
}
return slotInfoPartArray;
}
public HostAndPort getHostAndPortFromNodeLine(String[] nodeInfoPartArray,
HostAndPort current) {
String stringHostAndPort = nodeInfoPartArray[HOST_AND_PORT_INDEX];
String[] arrayHostAndPort = stringHostAndPort.split(":");
return new HostAndPort(
arrayHostAndPort[0].isEmpty() ? current.getHost()
: arrayHostAndPort[0],
arrayHostAndPort[1].isEmpty() ? current.getPort() : Integer
.valueOf(arrayHostAndPort[1]));
}
private void fillSlotInformation(String[] slotInfoPartArray,
ClusterNodeInformation info) {
for (String slotRange : slotInfoPartArray) {
fillSlotInformationFromSlotRange(slotRange, info);
}
}
private void fillSlotInformationFromSlotRange(String slotRange,
ClusterNodeInformation info) {
if (slotRange.startsWith(SLOT_IN_TRANSITION_IDENTIFIER)) {
// slot is in transition
int slot = Integer.parseInt(slotRange.substring(1).split("-")[0]);
if (slotRange.contains(SLOT_IMPORT_IDENTIFIER)) {
// import
info.addSlotBeingImported(slot);
} else {
// migrate (->-)
info.addSlotBeingMigrated(slot);
}
} else if (slotRange.contains("-")) {
// slot range
String[] slotRangePart = slotRange.split("-");
for (int slot = Integer.valueOf(slotRangePart[0]); slot <= Integer
.valueOf(slotRangePart[1]); slot++) {
info.addAvailableSlot(slot);
}
} else {
// single slot
info.addAvailableSlot(Integer.valueOf(slotRange));
}
}
}

View File

@@ -34,10 +34,10 @@ public final class RedisOutputStream extends FilterOutputStream {
}
public void write(final byte b) throws IOException {
buf[count++] = b;
if (count == buf.length) {
flushBuffer();
}
buf[count++] = b;
}
public void write(final byte[] b) throws IOException {
@@ -63,10 +63,10 @@ public final class RedisOutputStream extends FilterOutputStream {
final int size = in.length();
for (int i = 0; i != size; ++i) {
buf[count++] = (byte) in.charAt(i);
if (count == buf.length) {
flushBuffer();
}
buf[count++] = (byte) in.charAt(i);
}
writeCrLf();
@@ -111,19 +111,19 @@ public final class RedisOutputStream extends FilterOutputStream {
char c = str.charAt(i);
if (!(c < 0x80))
break;
buf[count++] = (byte) c;
if (count == buf.length) {
flushBuffer();
}
buf[count++] = (byte) c;
}
for (; i < strLen; i++) {
char c = str.charAt(i);
if (c < 0x80) {
buf[count++] = (byte) c;
if (count == buf.length) {
flushBuffer();
}
buf[count++] = (byte) c;
} else if (c < 0x800) {
if (2 >= buf.length - count) {
flushBuffer();