From c0bda88e2c1637cc9d0a5ccfef8dbe7effe7f503 Mon Sep 17 00:00:00 2001 From: samhendley Date: Mon, 24 Dec 2012 10:45:19 -0500 Subject: [PATCH] Normalized Pipeline[] interfaces and extracted all of the non-shardable commands into the MultiKey* interfaces --- .../clients/jedis/BasicRedisPipeline.java | 44 +++++++++ .../clients/jedis/BinaryRedisPipeline.java | 26 ++++-- .../jedis/MultiKeyBinaryRedisPipeline.java | 63 +++++++++++++ .../jedis/MultiKeyCommandsPipeline.java | 65 +++++++++++++ .../java/redis/clients/jedis/Pipeline.java | 87 +++++++++++++---- .../redis/clients/jedis/RedisPipeline.java | 10 ++ .../clients/jedis/ShardedJedisPipeline.java | 93 +++++++++++++++---- 7 files changed, 343 insertions(+), 45 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/BasicRedisPipeline.java create mode 100644 src/main/java/redis/clients/jedis/MultiKeyBinaryRedisPipeline.java create mode 100644 src/main/java/redis/clients/jedis/MultiKeyCommandsPipeline.java diff --git a/src/main/java/redis/clients/jedis/BasicRedisPipeline.java b/src/main/java/redis/clients/jedis/BasicRedisPipeline.java new file mode 100644 index 0000000..0b4ac36 --- /dev/null +++ b/src/main/java/redis/clients/jedis/BasicRedisPipeline.java @@ -0,0 +1,44 @@ +package redis.clients.jedis; + + +import java.util.List; + +/** + * Pipelined responses for all of the low level, non key related commands + */ +public interface BasicRedisPipeline { + + Response bgrewriteaof(); + + Response bgsave(); + + Response configGet(String pattern); + + Response configSet(String parameter, String value); + + Response configResetStat(); + + Response save(); + + Response lastsave(); + + Response discard(); + + Response> exec(); + + Response multi(); + + Response flushDB(); + + Response flushAll(); + + Response info(); + + Response dbSize(); + + Response shutdown(); + + Response ping(); + + Response select(int index); +} diff --git a/src/main/java/redis/clients/jedis/BinaryRedisPipeline.java b/src/main/java/redis/clients/jedis/BinaryRedisPipeline.java index 24b6089..959f915 100644 --- a/src/main/java/redis/clients/jedis/BinaryRedisPipeline.java +++ b/src/main/java/redis/clients/jedis/BinaryRedisPipeline.java @@ -10,10 +10,18 @@ import java.util.Set; public interface BinaryRedisPipeline { Response append(byte[] key, byte[] value); + Response> blpop(byte[] arg); + + Response> brpop(byte[] arg); + Response decr(byte[] key); Response decrBy(byte[] key, long integer); + Response del(byte[] keys); + + Response echo(byte[] string); + Response exists(byte[] key); Response expire(byte[] key, int seconds); @@ -32,17 +40,17 @@ public interface BinaryRedisPipeline { Response hexists(byte[] key, byte[] field); - Response hget(byte[] key, byte[] field); + Response hget(byte[] key, byte[] field); - Response> hgetAll(byte[] key); + Response> hgetAll(byte[] key); Response hincrBy(byte[] key, byte[] field, long value); - Response> hkeys(byte[] key); + Response> hkeys(byte[] key); Response hlen(byte[] key); - Response> hmget(byte[] key, byte[]... fields); + Response> hmget(byte[] key, byte[]... fields); Response hmset(byte[] key, Map hash); @@ -50,26 +58,26 @@ public interface BinaryRedisPipeline { Response hsetnx(byte[] key, byte[] field, byte[] value); - Response> hvals(byte[] key); + Response> hvals(byte[] key); Response incr(byte[] key); Response incrBy(byte[] key, long integer); - Response lindex(byte[] key, int index); + Response lindex(byte[] key, int index); Response linsert(byte[] key, BinaryClient.LIST_POSITION where, byte[] pivot, byte[] value); Response llen(byte[] key); - Response lpop(byte[] key); + Response lpop(byte[] key); Response lpush(byte[] key, byte[] string); Response lpushx(byte[] key, byte[] bytes); - Response> lrange(byte[] key, long start, long end); + Response> lrange(byte[] key, long start, long end); Response lrem(byte[] key, long count, byte[] value); @@ -79,7 +87,7 @@ public interface BinaryRedisPipeline { Response persist(byte[] key); - Response rpop(byte[] key); + Response rpop(byte[] key); Response rpush(byte[] key, byte[] string); diff --git a/src/main/java/redis/clients/jedis/MultiKeyBinaryRedisPipeline.java b/src/main/java/redis/clients/jedis/MultiKeyBinaryRedisPipeline.java new file mode 100644 index 0000000..12e2368 --- /dev/null +++ b/src/main/java/redis/clients/jedis/MultiKeyBinaryRedisPipeline.java @@ -0,0 +1,63 @@ +package redis.clients.jedis; + + +import java.util.List; +import java.util.Set; + +/** + * Multikey related commands (these are split out because they are non-shardable) + */ +public interface MultiKeyBinaryRedisPipeline { + + Response del(byte[]... keys); + + Response> blpop(byte[]... args); + + Response> brpop(byte[]... args); + + Response> keys(byte[] pattern); + + Response> mget(byte[]... keys); + + Response mset(byte[]... keysvalues); + + Response msetnx(byte[]... keysvalues); + + Response rename(byte[] oldkey, byte[] newkey); + + Response renamenx(byte[] oldkey, byte[] newkey); + + Response rpoplpush(byte[] srckey, byte[] dstkey); + + Response> sdiff(byte[]... keys); + + Response sdiffstore(byte[] dstkey, byte[]... keys); + + Response> sinter(byte[]... keys); + + Response sinterstore(byte[] dstkey, byte[]... keys); + + Response smove(byte[] srckey, byte[] dstkey, byte[] member); + + Response> sort(byte[] key, SortingParams sortingParameters, byte[] dstkey); + + Response> sort(byte[] key, byte[] dstkey); + + Response> sunion(byte[]... keys); + + Response sunionstore(byte[] dstkey, byte[]... keys); + + Response watch(byte[]... keys); + + Response zinterstore(byte[] dstkey, byte[]... sets); + + Response zinterstore(byte[] dstkey, ZParams params, byte[]... sets); + + Response zunionstore(byte[] dstkey, byte[]... sets); + + Response zunionstore(byte[] dstkey, ZParams params, byte[]... sets); + + Response brpoplpush(byte[] source, byte[] destination, int timeout); + + Response publish(byte[] channel, byte[] message); +} diff --git a/src/main/java/redis/clients/jedis/MultiKeyCommandsPipeline.java b/src/main/java/redis/clients/jedis/MultiKeyCommandsPipeline.java new file mode 100644 index 0000000..9960537 --- /dev/null +++ b/src/main/java/redis/clients/jedis/MultiKeyCommandsPipeline.java @@ -0,0 +1,65 @@ +package redis.clients.jedis; + + +import java.util.List; +import java.util.Set; + + +/** + * Multikey related commands (these are split out because they are non-shardable) + */ +public interface MultiKeyCommandsPipeline { + Response del(String... keys); + + Response> blpop(String... args); + + Response> brpop(String... args); + + Response> keys(String pattern); + + Response> mget(String... keys); + + Response mset(String... keysvalues); + + Response msetnx(String... keysvalues); + + Response rename(String oldkey, String newkey); + + Response renamenx(String oldkey, String newkey); + + Response rpoplpush(String srckey, String dstkey); + + Response> sdiff(String... keys); + + Response sdiffstore(String dstkey, String... keys); + + Response> sinter(String... keys); + + Response sinterstore(String dstkey, String... keys); + + Response smove(String srckey, String dstkey, String member); + + Response> sort(String key, SortingParams sortingParameters, String dstkey); + + Response> sort(String key, String dstkey); + + Response> sunion(String... keys); + + Response sunionstore(String dstkey, String... keys); + + Response watch(String... keys); + + Response zinterstore(String dstkey, String... sets); + + Response zinterstore(String dstkey, ZParams params, String... sets); + + Response zunionstore(String dstkey, String... sets); + + Response zunionstore(String dstkey, ZParams params, String... sets); + + Response brpoplpush(String source, String destination, int timeout); + + Response publish(String channel, String message); + + Response randomKey(); +} diff --git a/src/main/java/redis/clients/jedis/Pipeline.java b/src/main/java/redis/clients/jedis/Pipeline.java index f93b95a..ae8518c 100755 --- a/src/main/java/redis/clients/jedis/Pipeline.java +++ b/src/main/java/redis/clients/jedis/Pipeline.java @@ -9,7 +9,12 @@ import java.util.Set; import redis.clients.jedis.BinaryClient.LIST_POSITION; import redis.clients.jedis.exceptions.JedisDataException; -public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipeline { +public class Pipeline extends Queable implements + BasicRedisPipeline, + BinaryRedisPipeline, + RedisPipeline, + MultiKeyBinaryRedisPipeline, + MultiKeyCommandsPipeline { private MultiResponseBuilder currentMulti; @@ -59,6 +64,8 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel this.client = client; } + + /** * Syncronize pipeline by reading all responses. This operation close the * pipeline. In order to get return values from pipelined commands, capture @@ -103,6 +110,34 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return getResponse(BuilderFactory.LONG); } + public Response> blpop(String arg) { + String[] temp = new String[1]; + temp[0] = arg; + client.blpop(temp); + return getResponse(BuilderFactory.STRING_LIST); + } + + public Response> brpop(String arg) { + String[] temp = new String[1]; + temp[0] = arg; + client.brpop(temp); + return getResponse(BuilderFactory.STRING_LIST); + } + + public Response> blpop(byte[] arg) { + byte[][] temp = new byte[1][]; + temp[0] = arg; + client.blpop(temp); + return getResponse(BuilderFactory.BYTE_ARRAY_LIST); + } + + public Response> brpop(byte[] arg) { + byte[][] temp = new byte[1][]; + temp[0] = arg; + client.brpop(temp); + return getResponse(BuilderFactory.BYTE_ARRAY_LIST); + } + public Response> blpop(String... args) { client.blpop(args); return getResponse(BuilderFactory.STRING_LIST); @@ -143,11 +178,21 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return getResponse(BuilderFactory.LONG); } + public Response del(String key) { + client.del(key); + return getResponse(BuilderFactory.LONG); + } + public Response del(String... keys) { client.del(keys); return getResponse(BuilderFactory.LONG); } + public Response del(byte[] key) { + client.del(key); + return getResponse(BuilderFactory.LONG); + } + public Response del(byte[]... keys) { client.del(keys); return getResponse(BuilderFactory.LONG); @@ -541,9 +586,9 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return getResponse(BuilderFactory.STRING); } - public Response rename(byte[] oldkey, byte[] newkey) { + public Response rename(byte[] oldkey, byte[] newkey) { client.rename(oldkey, newkey); - return getResponse(BuilderFactory.STRING); + return getResponse(BuilderFactory.BYTE_ARRAY); } public Response renamenx(String oldkey, String newkey) { @@ -691,9 +736,9 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return getResponse(BuilderFactory.STRING_SET); } - public Response> sinter(byte[]... keys) { + public Response> sinter(byte[]... keys) { client.sinter(keys); - return getResponse(BuilderFactory.STRING_SET); + return getResponse(BuilderFactory.BYTE_ARRAY_ZSET); } public Response sinterstore(String dstkey, String... keys) { @@ -764,10 +809,10 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return getResponse(BuilderFactory.STRING_LIST); } - public Response> sort(byte[] key, + public Response> sort(byte[] key, SortingParams sortingParameters, byte[] dstkey) { client.sort(key, sortingParameters, dstkey); - return getResponse(BuilderFactory.STRING_LIST); + return getResponse(BuilderFactory.BYTE_ARRAY_LIST); } public Response> sort(String key, String dstkey) { @@ -775,9 +820,9 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return getResponse(BuilderFactory.STRING_LIST); } - public Response> sort(byte[] key, byte[] dstkey) { + public Response> sort(byte[] key, byte[] dstkey) { client.sort(key, dstkey); - return getResponse(BuilderFactory.STRING_LIST); + return getResponse(BuilderFactory.BYTE_ARRAY_LIST); } public Response spop(String key) { @@ -1250,10 +1295,10 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return getResponse(BuilderFactory.STRING); } - public Response brpoplpush(byte[] source, byte[] destination, + public Response brpoplpush(byte[] source, byte[] destination, int timeout) { client.brpoplpush(source, destination, timeout); - return getResponse(BuilderFactory.STRING); + return getResponse(BuilderFactory.BYTE_ARRAY); } public Response configResetStat() { @@ -1283,10 +1328,11 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return response; } - public void multi() { + public Response multi() { client.multi(); - getResponse(BuilderFactory.STRING); //Expecting OK + Response response = getResponse(BuilderFactory.STRING); //Expecting OK currentMulti = new MultiResponseBuilder(); + return response; } public Response publish(String channel, String message) { @@ -1299,6 +1345,16 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return getResponse(BuilderFactory.LONG); } + public Response randomKey() { + client.randomKey(); + return getResponse(BuilderFactory.STRING); + } + + public Response randomKeyBinary() { + client.randomKey(); + return getResponse(BuilderFactory.BYTE_ARRAY); + } + public Response flushDB() { client.flushDB(); return getResponse(BuilderFactory.STRING); @@ -1329,11 +1385,6 @@ public class Pipeline extends Queable implements BinaryRedisPipeline, RedisPipel return getResponse(BuilderFactory.STRING); } - public Response randomKey() { - client.randomKey(); - return getResponse(BuilderFactory.STRING); - } - public Response select(int index){ client.select(index); return getResponse(BuilderFactory.STRING); diff --git a/src/main/java/redis/clients/jedis/RedisPipeline.java b/src/main/java/redis/clients/jedis/RedisPipeline.java index ca3370e..570e620 100644 --- a/src/main/java/redis/clients/jedis/RedisPipeline.java +++ b/src/main/java/redis/clients/jedis/RedisPipeline.java @@ -10,10 +10,18 @@ import java.util.Set; public interface RedisPipeline { Response append(String key, String value); + Response> blpop(String arg); + + Response> brpop(String arg); + Response decr(String key); Response decrBy(String key, long integer); + Response del(String key); + + Response echo(String string); + Response exists(String key); Response expire(String key, int seconds); @@ -102,6 +110,8 @@ public interface RedisPipeline { Response setnx(String key, String value); + Response setrange(String key, long offset, String value); + Response> smembers(String key); Response sort(String key); diff --git a/src/main/java/redis/clients/jedis/ShardedJedisPipeline.java b/src/main/java/redis/clients/jedis/ShardedJedisPipeline.java index 48371ac..771cdc9 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedisPipeline.java +++ b/src/main/java/redis/clients/jedis/ShardedJedisPipeline.java @@ -55,6 +55,63 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.BYTE_ARRAY); } + public Response> blpop(byte[] arg) { + byte[][] temp = new byte[1][]; + temp[0] = arg; + Client c = getClient(arg); + c.blpop(temp); + results.add(new FutureResult(c)); + return getResponse(BuilderFactory.BYTE_ARRAY_LIST); + } + + public Response> brpop(byte[] arg) { + byte[][] temp = new byte[1][]; + temp[0] = arg; + Client c = getClient(arg); + c.blpop(temp); + results.add(new FutureResult(c)); + return getResponse(BuilderFactory.BYTE_ARRAY_LIST); + } + + public Response> blpop(String arg) { + String[] temp = new String[1]; + temp[0] = arg; + Client c = getClient(arg); + c.blpop(temp); + results.add(new FutureResult(c)); + return getResponse(BuilderFactory.STRING_LIST); + } + + public Response> brpop(String arg) { + String[] temp = new String[1]; + temp[0] = arg; + Client c = getClient(arg); + c.brpop(temp); + results.add(new FutureResult(c)); + return getResponse(BuilderFactory.STRING_LIST); + } + + public Response echo(byte[] string) { + Client c = getClient(string); + c.echo(string); + results.add(new FutureResult(c)); + return getResponse(BuilderFactory.BYTE_ARRAY); + } + + public Response echo(String string) { + Client c = getClient(string); + c.echo(string); + results.add(new FutureResult(c)); + return getResponse(BuilderFactory.STRING); + } + + public Response del(byte[] key) { + Client c = getClient(key); + c.del(key); + results.add(new FutureResult(c)); + return getResponse(BuilderFactory.LONG); + } + public Response del(String key) { Client c = getClient(key); c.del(key); @@ -279,11 +336,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.STRING); } - public Response hget(byte[] key, byte[] field) { + public Response hget(byte[] key, byte[] field) { Client c = getClient(key); c.hget(key, field); results.add(new FutureResult(c)); - return getResponse(BuilderFactory.STRING); + return getResponse(BuilderFactory.BYTE_ARRAY); } public Response hsetnx(String key, String field, String value) { @@ -321,11 +378,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.STRING_LIST); } - public Response> hmget(byte[] key, byte[]... fields) { + public Response> hmget(byte[] key, byte[]... fields) { Client c = getClient(key); c.hmget(key, fields); results.add(new FutureResult(c)); - return getResponse(BuilderFactory.STRING_LIST); + return getResponse(BuilderFactory.BYTE_ARRAY_LIST); } public Response hincrBy(String key, String field, long value) { @@ -391,11 +448,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.STRING_SET); } - public Response> hkeys(byte[] key) { + public Response> hkeys(byte[] key) { Client c = getClient(key); c.hkeys(key); results.add(new FutureResult(c)); - return getResponse(BuilderFactory.STRING_SET); + return getResponse(BuilderFactory.BYTE_ARRAY_ZSET); } public Response> hvals(String key) { @@ -405,11 +462,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.STRING_LIST); } - public Response> hvals(byte[] key) { + public Response> hvals(byte[] key) { Client c = getClient(key); c.hvals(key); results.add(new FutureResult(c)); - return getResponse(BuilderFactory.STRING_LIST); + return getResponse(BuilderFactory.BYTE_ARRAY_LIST); } public Response> hgetAll(String key) { @@ -419,11 +476,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.STRING_MAP); } - public Response> hgetAll(byte[] key) { + public Response> hgetAll(byte[] key) { Client c = getClient(key); c.hgetAll(key); results.add(new FutureResult(c)); - return getResponse(BuilderFactory.STRING_MAP); + return getResponse(BuilderFactory.BYTE_ARRAY_MAP); } public Response rpush(String key, String string) { @@ -503,11 +560,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.STRING_LIST); } - public Response> lrange(byte[] key, long start, long end) { + public Response> lrange(byte[] key, long start, long end) { Client c = getClient(key); c.lrange(key, start, end); results.add(new FutureResult(c)); - return getResponse(BuilderFactory.STRING_LIST); + return getResponse(BuilderFactory.BYTE_ARRAY_LIST); } public Response ltrim(String key, long start, long end) { @@ -545,11 +602,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.STRING); } - public Response lindex(byte[] key, int index) { + public Response lindex(byte[] key, int index) { Client c = getClient(key); c.lindex(key, index); results.add(new FutureResult(c)); - return getResponse(BuilderFactory.STRING); + return getResponse(BuilderFactory.BYTE_ARRAY); } public Response lset(String key, long index, String value) { @@ -587,11 +644,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.STRING); } - public Response lpop(byte[] key) { + public Response lpop(byte[] key) { Client c = getClient(key); c.lpop(key); results.add(new FutureResult(c)); - return getResponse(BuilderFactory.STRING); + return getResponse(BuilderFactory.BYTE_ARRAY); } public Response rpop(String key) { @@ -601,11 +658,11 @@ public class ShardedJedisPipeline extends Queable implements BinaryRedisPipeline return getResponse(BuilderFactory.STRING); } - public Response rpop(byte[] key) { + public Response rpop(byte[] key) { Client c = getClient(key); c.rpop(key); results.add(new FutureResult(c)); - return getResponse(BuilderFactory.STRING); + return getResponse(BuilderFactory.BYTE_ARRAY); } public Response sadd(String key, String member) {