From 8bec9fd373e91297ef0f1f8fa7fd41ae707542a2 Mon Sep 17 00:00:00 2001 From: Marcos Nils Date: Sat, 1 Feb 2014 20:13:56 -0300 Subject: [PATCH] Implement missing pubsub commands and fix indentation --- .../redis/clients/jedis/BinaryClient.java | 5 +- .../redis/clients/jedis/BuilderFactory.java | 21 +++ src/main/java/redis/clients/jedis/Client.java | 26 +++- src/main/java/redis/clients/jedis/Jedis.java | 27 +++- .../java/redis/clients/jedis/Protocol.java | 7 +- .../PublishSubscribeCommandsTest.java | 123 +++++++++++++++++- 6 files changed, 197 insertions(+), 12 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index 56f97af..2a65d96 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -554,7 +554,10 @@ public class BinaryClient extends Connection { public void punsubscribe(final byte[]... patterns) { sendCommand(PUNSUBSCRIBE, patterns); } - + + public void pubSub(final byte[]... args) { + sendCommand(PUBSUB, args); + } public void zcount(final byte[] key, final double min, final double max) { byte byteArrayMin[] = (min == Double.NEGATIVE_INFINITY) ? "-inf".getBytes() : toByteArray(min); diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index bebd2d6..27c728b 100755 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -2,6 +2,7 @@ package redis.clients.jedis; import redis.clients.util.SafeEncoder; +import java.nio.ByteBuffer; import java.util.*; public class BuilderFactory { @@ -96,6 +97,26 @@ public class BuilderFactory { } }; + + public static final Builder> STRING_LONG_MAP = new Builder>() { + @SuppressWarnings("unchecked") + public Map build(Object data) { + final List flatHash = (List) data; + final Map hash = new HashMap(); + final Iterator iterator = flatHash.iterator(); + while (iterator.hasNext()) { + hash.put(SafeEncoder.encode(iterator.next()), Long.valueOf(SafeEncoder.encode(iterator.next()))); + } + + return hash; + } + + public String toString() { + return "Map"; + } + + }; + public static final Builder> STRING_SET = new Builder>() { @SuppressWarnings("unchecked") public Set build(Object data) { diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index a2ec464..85f6a2b 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -1,6 +1,6 @@ package redis.clients.jedis; -import redis.clients.util.SafeEncoder; +import static redis.clients.jedis.Protocol.toByteArray; import java.util.ArrayList; import java.util.HashMap; @@ -8,8 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import static redis.clients.jedis.Protocol.toByteArray; -import static redis.clients.jedis.Protocol.Command.HSCAN; +import redis.clients.util.SafeEncoder; public class Client extends BinaryClient implements Commands { public Client(final String host) { @@ -671,6 +670,18 @@ public class Client extends BinaryClient implements Commands { } subscribe(cs); } + + public void pubSubChannels(String pattern) { + pubSub(Protocol.PUBSUB_CHANNELS, pattern); + } + + public void pubSubNumPat() { + pubSub(Protocol.PUBSUB_NUM_PAT); + } + + public void pubSubNumSub(String... channels) { + pubSub(Protocol.PUBSUB_NUMSUB, channels); + } public void configSet(String parameter, String value) { configSet(SafeEncoder.encode(parameter), SafeEncoder.encode(value)); @@ -831,6 +842,15 @@ public class Client extends BinaryClient implements Commands { arg[0] = SafeEncoder.encode(subcommand); cluster(arg); } + + public void pubSub(final String subcommand, final String... args) { + final byte[][] arg = new byte[args.length+1][]; + for (int i = 1; i < arg.length; i++) { + arg[i] = SafeEncoder.encode(args[i-1]); + } + arg[0] = SafeEncoder.encode(subcommand); + pubSub(arg); + } public void cluster(final String subcommand, final String... args) { final byte[][] arg = new byte[args.length+1][]; diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index cdc3918..6191132 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -13,8 +13,6 @@ import java.util.Set; import redis.clients.jedis.BinaryClient.LIST_POSITION; import redis.clients.util.SafeEncoder; import redis.clients.util.Slowlog; -import java.net.URI; -import java.util.*; public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands { public Jedis(final String host) { @@ -3209,9 +3207,28 @@ public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommand return client.getStatusCodeReply(); } - public String asking() { - checkIsInMulti(); + public String asking() { + checkIsInMulti(); client.asking(); return client.getStatusCodeReply(); - } + } + + public List pubSubChannels(String pattern) { + checkIsInMulti(); + client.pubSubChannels(pattern); + return client.getMultiBulkReply(); + } + + public Long pubSubNumPat() { + checkIsInMulti(); + client.pubSubNumPat(); + return client.getIntegerReply(); + } + + public Map pubSubNumSub(String... channels) { + checkIsInMulti(); + client.pubSubNumSub(channels); + return BuilderFactory.STRING_LONG_MAP + .build(client.getBinaryMultiBulkReply()); + } } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index a5e08c6..6a99f01 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -45,6 +45,11 @@ public final class Protocol { public static final String CLUSTER_SETSLOT_MIGRATING = "migrating"; public static final String CLUSTER_SETSLOT_IMPORTING = "importing"; + public static final String PUBSUB_CHANNELS= "channels"; + public static final String PUBSUB_NUMSUB = "numsub"; + public static final String PUBSUB_NUM_PAT = "numpat"; + + private Protocol() { // this prevent the class from instantiation } @@ -191,7 +196,7 @@ public final class Protocol { } public static enum Command { - PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING; + PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING; public final byte[] raw; diff --git a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java index 3ea14d5..749eda5 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java @@ -3,16 +3,17 @@ package redis.clients.jedis.tests.commands; import java.io.IOException; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Ignore; import org.junit.Test; import redis.clients.jedis.BinaryJedisPubSub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.util.SafeEncoder; public class PublishSubscribeCommandsTest extends JedisCommandTestBase { @@ -63,6 +64,124 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } }, "foo"); } + + + @Test + public void pubSubChannels(){ + final List expectedActiveChannels = Arrays.asList("testchan1", "testchan2", "testchan3"); + jedis.subscribe(new JedisPubSub() { + private int count = 0; + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + count++; + //All channels are subscribed + if (count == 3) { + Jedis otherJedis = createJedis(); + List activeChannels = otherJedis.pubSubChannels("test*"); + assertTrue(expectedActiveChannels.containsAll(activeChannels)); + unsubscribe(); + } + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPMessage(String pattern, String channel, String message) { + } + + @Override + public void onMessage(String channel, String message) { + } + }, "testchan1", "testchan2", "testchan3"); + } + + @Test + public void pubSubNumPat(){ + jedis.psubscribe(new JedisPubSub() { + private int count=0; + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + count++; + if (count == 3) { + Jedis otherJedis = createJedis(); + Long numPatterns = otherJedis.pubSubNumPat(); + assertEquals(new Long(2l), numPatterns); + punsubscribe(); + } + } + + @Override + public void onPMessage(String pattern, String channel, String message) { + } + + @Override + public void onMessage(String channel, String message) { + } + }, "test*", "test*", "chan*"); + } + + @Test + public void pubSubNumSub(){ + final Map expectedNumSub = new HashMap(); + expectedNumSub.put("testchannel2", 1l); + expectedNumSub.put("testchannel1", 1l); + jedis.subscribe(new JedisPubSub() { + private int count=0; + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + count++; + if (count == 2) { + Jedis otherJedis = createJedis(); + Map numSub = otherJedis.pubSubNumSub("testchannel1", "testchannel2"); + assertEquals(expectedNumSub, numSub); + unsubscribe(); + } + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPMessage(String pattern, String channel, String message) { + } + + @Override + public void onMessage(String channel, String message) { + } + }, "testchannel1", "testchannel2"); + } @Test public void subscribeMany() throws UnknownHostException, IOException,