Normalized Pipeline[] interfaces and extracted all of the non-shardable commands into the MultiKey* interfaces

This commit is contained in:
samhendley
2012-12-24 10:45:19 -05:00
parent f9e818c92b
commit c0bda88e2c
7 changed files with 343 additions and 45 deletions

View File

@@ -0,0 +1,44 @@
package redis.clients.jedis;
import java.util.List;
/**
* Pipelined responses for all of the low level, non key related commands
*/
public interface BasicRedisPipeline {
Response<String> bgrewriteaof();
Response<String> bgsave();
Response<String> configGet(String pattern);
Response<String> configSet(String parameter, String value);
Response<String> configResetStat();
Response<String> save();
Response<Long> lastsave();
Response<String> discard();
Response<List<Object>> exec();
Response<String> multi();
Response<String> flushDB();
Response<String> flushAll();
Response<String> info();
Response<Long> dbSize();
Response<String> shutdown();
Response<String> ping();
Response<String> select(int index);
}

View File

@@ -10,10 +10,18 @@ import java.util.Set;
public interface BinaryRedisPipeline {
Response<Long> append(byte[] key, byte[] value);
Response<List<byte[]>> blpop(byte[] arg);
Response<List<byte[]>> brpop(byte[] arg);
Response<Long> decr(byte[] key);
Response<Long> decrBy(byte[] key, long integer);
Response<Long> del(byte[] keys);
Response<byte[]> echo(byte[] string);
Response<Boolean> exists(byte[] key);
Response<Long> expire(byte[] key, int seconds);
@@ -32,17 +40,17 @@ public interface BinaryRedisPipeline {
Response<Boolean> hexists(byte[] key, byte[] field);
Response<String> hget(byte[] key, byte[] field);
Response<byte[]> hget(byte[] key, byte[] field);
Response<Map<String, String>> hgetAll(byte[] key);
Response<Map<byte[], byte[]>> hgetAll(byte[] key);
Response<Long> hincrBy(byte[] key, byte[] field, long value);
Response<Set<String>> hkeys(byte[] key);
Response<Set<byte[]>> hkeys(byte[] key);
Response<Long> hlen(byte[] key);
Response<List<String>> hmget(byte[] key, byte[]... fields);
Response<List<byte[]>> hmget(byte[] key, byte[]... fields);
Response<String> hmset(byte[] key, Map<byte[], byte[]> hash);
@@ -50,26 +58,26 @@ public interface BinaryRedisPipeline {
Response<Long> hsetnx(byte[] key, byte[] field, byte[] value);
Response<List<String>> hvals(byte[] key);
Response<List<byte[]>> hvals(byte[] key);
Response<Long> incr(byte[] key);
Response<Long> incrBy(byte[] key, long integer);
Response<String> lindex(byte[] key, int index);
Response<byte[]> lindex(byte[] key, int index);
Response<Long> linsert(byte[] key, BinaryClient.LIST_POSITION where,
byte[] pivot, byte[] value);
Response<Long> llen(byte[] key);
Response<String> lpop(byte[] key);
Response<byte[]> lpop(byte[] key);
Response<Long> lpush(byte[] key, byte[] string);
Response<Long> lpushx(byte[] key, byte[] bytes);
Response<List<String>> lrange(byte[] key, long start, long end);
Response<List<byte[]>> lrange(byte[] key, long start, long end);
Response<Long> lrem(byte[] key, long count, byte[] value);
@@ -79,7 +87,7 @@ public interface BinaryRedisPipeline {
Response<Long> persist(byte[] key);
Response<String> rpop(byte[] key);
Response<byte[]> rpop(byte[] key);
Response<Long> rpush(byte[] key, byte[] string);

View File

@@ -0,0 +1,63 @@
package redis.clients.jedis;
import java.util.List;
import java.util.Set;
/**
* Multikey related commands (these are split out because they are non-shardable)
*/
public interface MultiKeyBinaryRedisPipeline {
Response<Long> del(byte[]... keys);
Response<List<byte[]>> blpop(byte[]... args);
Response<List<byte[]>> brpop(byte[]... args);
Response<Set<byte[]>> keys(byte[] pattern);
Response<List<byte[]>> mget(byte[]... keys);
Response<String> mset(byte[]... keysvalues);
Response<Long> msetnx(byte[]... keysvalues);
Response<byte[]> rename(byte[] oldkey, byte[] newkey);
Response<Long> renamenx(byte[] oldkey, byte[] newkey);
Response<byte[]> rpoplpush(byte[] srckey, byte[] dstkey);
Response<Set<byte[]>> sdiff(byte[]... keys);
Response<Long> sdiffstore(byte[] dstkey, byte[]... keys);
Response<Set<byte[]>> sinter(byte[]... keys);
Response<Long> sinterstore(byte[] dstkey, byte[]... keys);
Response<Long> smove(byte[] srckey, byte[] dstkey, byte[] member);
Response<List<byte[]>> sort(byte[] key, SortingParams sortingParameters, byte[] dstkey);
Response<List<byte[]>> sort(byte[] key, byte[] dstkey);
Response<Set<String>> sunion(byte[]... keys);
Response<Long> sunionstore(byte[] dstkey, byte[]... keys);
Response<String> watch(byte[]... keys);
Response<Long> zinterstore(byte[] dstkey, byte[]... sets);
Response<Long> zinterstore(byte[] dstkey, ZParams params, byte[]... sets);
Response<Long> zunionstore(byte[] dstkey, byte[]... sets);
Response<Long> zunionstore(byte[] dstkey, ZParams params, byte[]... sets);
Response<byte[]> brpoplpush(byte[] source, byte[] destination, int timeout);
Response<Long> publish(byte[] channel, byte[] message);
}

View File

@@ -0,0 +1,65 @@
package redis.clients.jedis;
import java.util.List;
import java.util.Set;
/**
* Multikey related commands (these are split out because they are non-shardable)
*/
public interface MultiKeyCommandsPipeline {
Response<Long> del(String... keys);
Response<List<String>> blpop(String... args);
Response<List<String>> brpop(String... args);
Response<Set<String>> keys(String pattern);
Response<List<String>> mget(String... keys);
Response<String> mset(String... keysvalues);
Response<Long> msetnx(String... keysvalues);
Response<String> rename(String oldkey, String newkey);
Response<Long> renamenx(String oldkey, String newkey);
Response<String> rpoplpush(String srckey, String dstkey);
Response<Set<String>> sdiff(String... keys);
Response<Long> sdiffstore(String dstkey, String... keys);
Response<Set<String>> sinter(String... keys);
Response<Long> sinterstore(String dstkey, String... keys);
Response<Long> smove(String srckey, String dstkey, String member);
Response<List<String>> sort(String key, SortingParams sortingParameters, String dstkey);
Response<List<String>> sort(String key, String dstkey);
Response<Set<String>> sunion(String... keys);
Response<Long> sunionstore(String dstkey, String... keys);
Response<String> watch(String... keys);
Response<Long> zinterstore(String dstkey, String... sets);
Response<Long> zinterstore(String dstkey, ZParams params, String... sets);
Response<Long> zunionstore(String dstkey, String... sets);
Response<Long> zunionstore(String dstkey, ZParams params, String... sets);
Response<String> brpoplpush(String source, String destination, int timeout);
Response<Long> publish(String channel, String message);
Response<String> randomKey();
}

View File

@@ -9,7 +9,12 @@ import java.util.Set;
import redis.clients.jedis.BinaryClient.LIST_POSITION;
import redis.clients.jedis.exceptions.JedisDataException;
public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipeline {
public class Pipeline extends Queable implements
BasicRedisPipeline,
BinaryRedisPipeline,
RedisPipeline,
MultiKeyBinaryRedisPipeline,
MultiKeyCommandsPipeline {
private MultiResponseBuilder currentMulti;
@@ -59,6 +64,8 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
this.client = client;
}
/**
* Syncronize pipeline by reading all responses. This operation close the
* pipeline. In order to get return values from pipelined commands, capture
@@ -103,6 +110,34 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return getResponse(BuilderFactory.LONG);
}
public Response<List<String>> blpop(String arg) {
String[] temp = new String[1];
temp[0] = arg;
client.blpop(temp);
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<List<String>> brpop(String arg) {
String[] temp = new String[1];
temp[0] = arg;
client.brpop(temp);
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<List<byte[]>> blpop(byte[] arg) {
byte[][] temp = new byte[1][];
temp[0] = arg;
client.blpop(temp);
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}
public Response<List<byte[]>> brpop(byte[] arg) {
byte[][] temp = new byte[1][];
temp[0] = arg;
client.brpop(temp);
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}
public Response<List<String>> blpop(String... args) {
client.blpop(args);
return getResponse(BuilderFactory.STRING_LIST);
@@ -143,11 +178,21 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return getResponse(BuilderFactory.LONG);
}
public Response<Long> del(String key) {
client.del(key);
return getResponse(BuilderFactory.LONG);
}
public Response<Long> del(String... keys) {
client.del(keys);
return getResponse(BuilderFactory.LONG);
}
public Response<Long> del(byte[] key) {
client.del(key);
return getResponse(BuilderFactory.LONG);
}
public Response<Long> del(byte[]... keys) {
client.del(keys);
return getResponse(BuilderFactory.LONG);
@@ -541,9 +586,9 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return getResponse(BuilderFactory.STRING);
}
public Response<String> rename(byte[] oldkey, byte[] newkey) {
public Response<byte[]> rename(byte[] oldkey, byte[] newkey) {
client.rename(oldkey, newkey);
return getResponse(BuilderFactory.STRING);
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<Long> renamenx(String oldkey, String newkey) {
@@ -691,9 +736,9 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return getResponse(BuilderFactory.STRING_SET);
}
public Response<Set<String>> sinter(byte[]... keys) {
public Response<Set<byte[]>> sinter(byte[]... keys) {
client.sinter(keys);
return getResponse(BuilderFactory.STRING_SET);
return getResponse(BuilderFactory.BYTE_ARRAY_ZSET);
}
public Response<Long> sinterstore(String dstkey, String... keys) {
@@ -764,10 +809,10 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<List<String>> sort(byte[] key,
public Response<List<byte[]>> sort(byte[] key,
SortingParams sortingParameters, byte[] dstkey) {
client.sort(key, sortingParameters, dstkey);
return getResponse(BuilderFactory.STRING_LIST);
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}
public Response<List<String>> sort(String key, String dstkey) {
@@ -775,9 +820,9 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<List<String>> sort(byte[] key, byte[] dstkey) {
public Response<List<byte[]>> sort(byte[] key, byte[] dstkey) {
client.sort(key, dstkey);
return getResponse(BuilderFactory.STRING_LIST);
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}
public Response<String> spop(String key) {
@@ -1250,10 +1295,10 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return getResponse(BuilderFactory.STRING);
}
public Response<String> brpoplpush(byte[] source, byte[] destination,
public Response<byte[]> brpoplpush(byte[] source, byte[] destination,
int timeout) {
client.brpoplpush(source, destination, timeout);
return getResponse(BuilderFactory.STRING);
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<String> configResetStat() {
@@ -1283,10 +1328,11 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return response;
}
public void multi() {
public Response<String> multi() {
client.multi();
getResponse(BuilderFactory.STRING); //Expecting OK
Response<String> response = getResponse(BuilderFactory.STRING); //Expecting OK
currentMulti = new MultiResponseBuilder();
return response;
}
public Response<Long> publish(String channel, String message) {
@@ -1299,6 +1345,16 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return getResponse(BuilderFactory.LONG);
}
public Response<String> randomKey() {
client.randomKey();
return getResponse(BuilderFactory.STRING);
}
public Response<byte[]> randomKeyBinary() {
client.randomKey();
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<String> flushDB() {
client.flushDB();
return getResponse(BuilderFactory.STRING);
@@ -1329,11 +1385,6 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel
return getResponse(BuilderFactory.STRING);
}
public Response<String> randomKey() {
client.randomKey();
return getResponse(BuilderFactory.STRING);
}
public Response<String> select(int index){
client.select(index);
return getResponse(BuilderFactory.STRING);

View File

@@ -10,10 +10,18 @@ import java.util.Set;
public interface RedisPipeline {
Response<Long> append(String key, String value);
Response<List<String>> blpop(String arg);
Response<List<String>> brpop(String arg);
Response<Long> decr(String key);
Response<Long> decrBy(String key, long integer);
Response<Long> del(String key);
Response<String> echo(String string);
Response<Boolean> exists(String key);
Response<Long> expire(String key, int seconds);
@@ -102,6 +110,8 @@ public interface RedisPipeline {
Response<Long> setnx(String key, String value);
Response<Long> setrange(String key, long offset, String value);
Response<Set<String>> smembers(String key);
Response<Long> sort(String key);

View File

@@ -55,6 +55,63 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<List<byte[]>> blpop(byte[] arg) {
byte[][] temp = new byte[1][];
temp[0] = arg;
Client c = getClient(arg);
c.blpop(temp);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}
public Response<List<byte[]>> brpop(byte[] arg) {
byte[][] temp = new byte[1][];
temp[0] = arg;
Client c = getClient(arg);
c.blpop(temp);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}
public Response<List<String>> blpop(String arg) {
String[] temp = new String[1];
temp[0] = arg;
Client c = getClient(arg);
c.blpop(temp);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<List<String>> brpop(String arg) {
String[] temp = new String[1];
temp[0] = arg;
Client c = getClient(arg);
c.brpop(temp);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<byte[]> echo(byte[] string) {
Client c = getClient(string);
c.echo(string);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<String> echo(String string) {
Client c = getClient(string);
c.echo(string);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING);
}
public Response<Long> del(byte[] key) {
Client c = getClient(key);
c.del(key);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.LONG);
}
public Response<Long> del(String key) {
Client c = getClient(key);
c.del(key);
@@ -279,11 +336,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.STRING);
}
public Response<String> hget(byte[] key, byte[] field) {
public Response<byte[]> hget(byte[] key, byte[] field) {
Client c = getClient(key);
c.hget(key, field);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING);
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<Long> hsetnx(String key, String field, String value) {
@@ -321,11 +378,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<List<String>> hmget(byte[] key, byte[]... fields) {
public Response<List<byte[]>> hmget(byte[] key, byte[]... fields) {
Client c = getClient(key);
c.hmget(key, fields);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING_LIST);
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}
public Response<Long> hincrBy(String key, String field, long value) {
@@ -391,11 +448,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.STRING_SET);
}
public Response<Set<String>> hkeys(byte[] key) {
public Response<Set<byte[]>> hkeys(byte[] key) {
Client c = getClient(key);
c.hkeys(key);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING_SET);
return getResponse(BuilderFactory.BYTE_ARRAY_ZSET);
}
public Response<List<String>> hvals(String key) {
@@ -405,11 +462,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<List<String>> hvals(byte[] key) {
public Response<List<byte[]>> hvals(byte[] key) {
Client c = getClient(key);
c.hvals(key);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING_LIST);
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}
public Response<Map<String, String>> hgetAll(String key) {
@@ -419,11 +476,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.STRING_MAP);
}
public Response<Map<String, String>> hgetAll(byte[] key) {
public Response<Map<byte[], byte[]>> hgetAll(byte[] key) {
Client c = getClient(key);
c.hgetAll(key);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING_MAP);
return getResponse(BuilderFactory.BYTE_ARRAY_MAP);
}
public Response<Long> rpush(String key, String string) {
@@ -503,11 +560,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.STRING_LIST);
}
public Response<List<String>> lrange(byte[] key, long start, long end) {
public Response<List<byte[]>> lrange(byte[] key, long start, long end) {
Client c = getClient(key);
c.lrange(key, start, end);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING_LIST);
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}
public Response<String> ltrim(String key, long start, long end) {
@@ -545,11 +602,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.STRING);
}
public Response<String> lindex(byte[] key, int index) {
public Response<byte[]> lindex(byte[] key, int index) {
Client c = getClient(key);
c.lindex(key, index);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING);
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<String> lset(String key, long index, String value) {
@@ -587,11 +644,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.STRING);
}
public Response<String> lpop(byte[] key) {
public Response<byte[]> lpop(byte[] key) {
Client c = getClient(key);
c.lpop(key);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING);
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<String> rpop(String key) {
@@ -601,11 +658,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline
return getResponse(BuilderFactory.STRING);
}
public Response<String> rpop(byte[] key) {
public Response<byte[]> rpop(byte[] key) {
Client c = getClient(key);
c.rpop(key);
results.add(new FutureResult(c));
return getResponse(BuilderFactory.STRING);
return getResponse(BuilderFactory.BYTE_ARRAY);
}
public Response<Long> sadd(String key, String member) {