Fixed binary safe pipeline commands and added tests for pipelined hash functions
This commit is contained in:
25
src/main/java/redis/clients/jedis/BuilderFactory.java
Normal file → Executable file
25
src/main/java/redis/clients/jedis/BuilderFactory.java
Normal file → Executable file
@@ -144,6 +144,13 @@ public class BuilderFactory {
|
||||
}
|
||||
List<byte[]> l = (List<byte[]>) data;
|
||||
final Set<byte[]> result = new LinkedHashSet<byte[]>(l);
|
||||
for (final byte[] barray : l) {
|
||||
if (barray == null) {
|
||||
result.add(null);
|
||||
} else {
|
||||
result.add(barray);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -151,6 +158,24 @@ public class BuilderFactory {
|
||||
return "ZSet<byte[]>";
|
||||
}
|
||||
};
|
||||
public static final Builder<Map<byte[], byte[]>> BYTE_ARRAY_MAP = new Builder<Map<byte[], byte[]>>() {
|
||||
@SuppressWarnings("unchecked")
|
||||
public Map<byte[], byte[]> build(Object data) {
|
||||
final List<byte[]> flatHash = (List<byte[]>) data;
|
||||
final Map<byte[], byte[]> hash = new HashMap<byte[], byte[]>();
|
||||
final Iterator<byte[]> iterator = flatHash.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
hash.put(iterator.next(), iterator.next());
|
||||
}
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "Map<byte[], byte[]>";
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
public static final Builder<Set<String>> STRING_ZSET = new Builder<Set<String>>() {
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
67
src/main/java/redis/clients/jedis/Pipeline.java
Normal file → Executable file
67
src/main/java/redis/clients/jedis/Pipeline.java
Normal file → Executable file
@@ -78,7 +78,6 @@ public class Pipeline extends Queable {
|
||||
* right response type (usually it is a waste of time).
|
||||
*
|
||||
* @return A list of all the responses in the order you executed them.
|
||||
* @see sync
|
||||
*/
|
||||
public List<Object> syncAndReturnAll() {
|
||||
List<Object> unformatted = client.getAll();
|
||||
@@ -109,9 +108,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
}
|
||||
|
||||
public Response<List<String>> blpop(byte[]... args) {
|
||||
public Response<List<byte[]>> blpop(byte[]... args) {
|
||||
client.blpop(args);
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
|
||||
}
|
||||
|
||||
public Response<List<String>> brpop(String... args) {
|
||||
@@ -119,9 +118,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
}
|
||||
|
||||
public Response<List<String>> brpop(byte[]... args) {
|
||||
public Response<List<byte[]>> brpop(byte[]... args) {
|
||||
client.brpop(args);
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
|
||||
}
|
||||
|
||||
public Response<Long> decr(String key) {
|
||||
@@ -159,9 +158,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
public Response<String> echo(byte[] string) {
|
||||
public Response<byte[]> echo(byte[] string) {
|
||||
client.echo(string);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY);
|
||||
}
|
||||
|
||||
public Response<Boolean> exists(String key) {
|
||||
@@ -250,9 +249,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
public Response<String> hget(byte[] key, byte[] field) {
|
||||
public Response<byte[]> hget(byte[] key, byte[] field) {
|
||||
client.hget(key, field);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY);
|
||||
}
|
||||
|
||||
public Response<Map<String, String>> hgetAll(String key) {
|
||||
@@ -260,9 +259,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_MAP);
|
||||
}
|
||||
|
||||
public Response<Map<String, String>> hgetAll(byte[] key) {
|
||||
public Response<Map<byte[], byte[]>> hgetAll(byte[] key) {
|
||||
client.hgetAll(key);
|
||||
return getResponse(BuilderFactory.STRING_MAP);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_MAP);
|
||||
}
|
||||
|
||||
public Response<Long> hincrBy(String key, String field, long value) {
|
||||
@@ -280,9 +279,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_SET);
|
||||
}
|
||||
|
||||
public Response<Set<String>> hkeys(byte[] key) {
|
||||
public Response<Set<byte[]>> hkeys(byte[] key) {
|
||||
client.hkeys(key);
|
||||
return getResponse(BuilderFactory.STRING_SET);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_ZSET);
|
||||
}
|
||||
|
||||
public Response<Long> hlen(String key) {
|
||||
@@ -300,9 +299,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
}
|
||||
|
||||
public Response<List<String>> hmget(byte[] key, byte[]... fields) {
|
||||
public Response<List<byte[]>> hmget(byte[] key, byte[]... fields) {
|
||||
client.hmget(key, fields);
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
|
||||
}
|
||||
|
||||
public Response<String> hmset(String key, Map<String, String> hash) {
|
||||
@@ -340,9 +339,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
}
|
||||
|
||||
public Response<List<String>> hvals(byte[] key) {
|
||||
public Response<List<byte[]>> hvals(byte[] key) {
|
||||
client.hvals(key);
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
|
||||
}
|
||||
|
||||
public Response<Long> incr(String key) {
|
||||
@@ -370,9 +369,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_SET);
|
||||
}
|
||||
|
||||
public Response<Set<String>> keys(byte[] pattern) {
|
||||
public Response<Set<byte[]>> keys(byte[] pattern) {
|
||||
client.keys(pattern);
|
||||
return getResponse(BuilderFactory.STRING_SET);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_ZSET);
|
||||
}
|
||||
|
||||
public Response<String> lindex(String key, int index) {
|
||||
@@ -380,9 +379,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
public Response<String> lindex(byte[] key, int index) {
|
||||
public Response<byte[]> lindex(byte[] key, int index) {
|
||||
client.lindex(key, index);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY);
|
||||
}
|
||||
|
||||
public Response<Long> linsert(String key, LIST_POSITION where,
|
||||
@@ -412,9 +411,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
public Response<String> lpop(byte[] key) {
|
||||
public Response<byte[]> lpop(byte[] key) {
|
||||
client.lpop(key);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY);
|
||||
}
|
||||
|
||||
public Response<Long> lpush(String key, String string) {
|
||||
@@ -442,9 +441,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
}
|
||||
|
||||
public Response<List<String>> lrange(byte[] key, long start, long end) {
|
||||
public Response<List<byte[]>> lrange(byte[] key, long start, long end) {
|
||||
client.lrange(key, start, end);
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
|
||||
}
|
||||
|
||||
public Response<Long> lrem(String key, long count, String value) {
|
||||
@@ -482,9 +481,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
}
|
||||
|
||||
public Response<List<String>> mget(byte[]... keys) {
|
||||
public Response<List<byte[]>> mget(byte[]... keys) {
|
||||
client.mget(keys);
|
||||
return getResponse(BuilderFactory.STRING_LIST);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
|
||||
}
|
||||
|
||||
public Response<Long> move(String key, int dbIndex) {
|
||||
@@ -552,9 +551,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
public Response<String> rpop(byte[] key) {
|
||||
public Response<byte[]> rpop(byte[] key) {
|
||||
client.rpop(key);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY);
|
||||
}
|
||||
|
||||
public Response<String> rpoplpush(String srckey, String dstkey) {
|
||||
@@ -562,9 +561,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
|
||||
public Response<String> rpoplpush(byte[] srckey, byte[] dstkey) {
|
||||
public Response<byte[]> rpoplpush(byte[] srckey, byte[] dstkey) {
|
||||
client.rpoplpush(srckey, dstkey);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY);
|
||||
}
|
||||
|
||||
public Response<Long> rpush(String key, String string) {
|
||||
@@ -612,9 +611,9 @@ public class Pipeline extends Queable {
|
||||
return getResponse(BuilderFactory.STRING_SET);
|
||||
}
|
||||
|
||||
public Response<Set<String>> sdiff(byte[]... keys) {
|
||||
public Response<Set<byte[]>> sdiff(byte[]... keys) {
|
||||
client.sdiff(keys);
|
||||
return getResponse(BuilderFactory.STRING_SET);
|
||||
return getResponse(BuilderFactory.BYTE_ARRAY_ZSET);
|
||||
}
|
||||
|
||||
public Response<Long> sdiffstore(String dstkey, String... keys) {
|
||||
@@ -1319,4 +1318,4 @@ public class Pipeline extends Queable {
|
||||
client.select(index);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
59
src/test/java/redis/clients/jedis/tests/PipeliningTest.java
Normal file → Executable file
59
src/test/java/redis/clients/jedis/tests/PipeliningTest.java
Normal file → Executable file
@@ -8,10 +8,7 @@ import redis.clients.jedis.exceptions.JedisDataException;
|
||||
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
|
||||
public class PipeliningTest extends Assert {
|
||||
private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0);
|
||||
@@ -100,7 +97,59 @@ public class PipeliningTest extends Assert {
|
||||
|
||||
assertNotNull(score.get());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void pipelineBinarySafeHashCommands() {
|
||||
jedis.hset("key".getBytes(), "f1".getBytes(), "v111".getBytes());
|
||||
jedis.hset("key".getBytes(), "f22".getBytes(), "v2222".getBytes());
|
||||
|
||||
Pipeline p = jedis.pipelined();
|
||||
Response<Map<byte[],byte[]>> fmap = p.hgetAll("key".getBytes());
|
||||
Response<Set<byte[]>> fkeys = p.hkeys("key".getBytes());
|
||||
Response<List<byte[]>> fordered = p.hmget("key".getBytes(), "f22".getBytes(), "f1".getBytes());
|
||||
Response<List<byte[]>> fvals = p.hvals("key".getBytes());
|
||||
p.sync();
|
||||
|
||||
assertNotNull(fmap.get());
|
||||
// we have to do these strange contortions because byte[] is not a very good key
|
||||
// for a java Map. It only works with equality (you need the exact key object to retrieve
|
||||
// the value) I recommend we switch to using ByteBuffer or something similar:
|
||||
// http://stackoverflow.com/questions/1058149/using-a-byte-array-as-hashmap-key-java
|
||||
Map<byte[],byte[]> map = fmap.get();
|
||||
Set<byte[]> mapKeys = map.keySet();
|
||||
Iterator<byte[]> iterMap = mapKeys.iterator();
|
||||
byte[] firstMapKey = iterMap.next();
|
||||
byte[] secondMapKey = iterMap.next();
|
||||
assertFalse(iterMap.hasNext());
|
||||
verifyHasBothValues(firstMapKey, secondMapKey, "f1".getBytes(), "f22".getBytes());
|
||||
byte[] firstMapValue = map.get(firstMapKey);
|
||||
byte[] secondMapValue = map.get(secondMapKey);
|
||||
verifyHasBothValues(firstMapValue, secondMapValue, "v111".getBytes(), "v2222".getBytes());
|
||||
|
||||
assertNotNull(fkeys.get());
|
||||
Iterator<byte[]> iter = fkeys.get().iterator();
|
||||
byte[] firstKey = iter.next();
|
||||
byte[] secondKey = iter.next();
|
||||
assertFalse(iter.hasNext());
|
||||
verifyHasBothValues(firstKey, secondKey, "f1".getBytes(), "f22".getBytes());
|
||||
|
||||
assertNotNull(fordered.get());
|
||||
assertArrayEquals("v2222".getBytes(), fordered.get().get(0));
|
||||
assertArrayEquals("v111".getBytes(), fordered.get().get(1));
|
||||
|
||||
assertNotNull(fvals.get());
|
||||
assertEquals(2, fvals.get().size());
|
||||
byte[] firstValue = fvals.get().get(0);
|
||||
byte[] secondValue = fvals.get().get(1);
|
||||
verifyHasBothValues(firstValue, secondValue, "v111".getBytes(), "v2222".getBytes());
|
||||
}
|
||||
|
||||
private void verifyHasBothValues(byte[] firstKey, byte[] secondKey, byte[] value1, byte[] value2) {
|
||||
assertFalse(Arrays.equals(firstKey, secondKey));
|
||||
assertTrue(Arrays.equals(firstKey, value1) || Arrays.equals(firstKey, value2));
|
||||
assertTrue(Arrays.equals(secondKey, value1) || Arrays.equals(secondKey, value2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pipelineSelect() {
|
||||
Pipeline p = jedis.pipelined();
|
||||
|
||||
Reference in New Issue
Block a user