diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index e80754f..26cd8f2 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -560,7 +560,10 @@ public class BinaryClient extends Connection { public void punsubscribe(final byte[]... patterns) { sendCommand(PUNSUBSCRIBE, patterns); } - + + public void pubsub(final byte[]... args) { + sendCommand(PUBSUB, args); + } public void zcount(final byte[] key, final double min, final double max) { byte byteArrayMin[] = (min == Double.NEGATIVE_INFINITY) ? "-inf" diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index d8ae47b..76d013e 100755 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -103,6 +103,7 @@ public class BuilderFactory { } }; + public static final Builder> STRING_SET = new Builder>() { @SuppressWarnings("unchecked") public Set build(Object data) { diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index e793ce7..96d0560 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -672,6 +672,18 @@ public class Client extends BinaryClient implements Commands { } subscribe(cs); } + + public void pubsubChannels(String pattern) { + pubsub(Protocol.PUBSUB_CHANNELS, pattern); + } + + public void pubsubNumPat() { + pubsub(Protocol.PUBSUB_NUM_PAT); + } + + public void pubsubNumSub(String... channels) { + pubsub(Protocol.PUBSUB_NUMSUB, channels); + } public void configSet(String parameter, String value) { configSet(SafeEncoder.encode(parameter), SafeEncoder.encode(value)); @@ -841,6 +853,15 @@ public class Client extends BinaryClient implements Commands { arg[0] = SafeEncoder.encode(subcommand); cluster(arg); } + + public void pubsub(final String subcommand, final String... args) { + final byte[][] arg = new byte[args.length+1][]; + for (int i = 1; i < arg.length; i++) { + arg[i] = SafeEncoder.encode(args[i-1]); + } + arg[0] = SafeEncoder.encode(subcommand); + pubsub(arg); + } public void cluster(final String subcommand, final String... args) { final byte[][] arg = new byte[args.length + 1][]; diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index d465179..d5f7283 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3232,4 +3232,23 @@ public class Jedis extends BinaryJedis implements JedisCommands, client.asking(); return client.getStatusCodeReply(); } + + public List pubsubChannels(String pattern) { + checkIsInMulti(); + client.pubsubChannels(pattern); + return client.getMultiBulkReply(); + } + + public Long pubsubNumPat() { + checkIsInMulti(); + client.pubsubNumPat(); + return client.getIntegerReply(); + } + + public Map pubsubNumSub(String... channels) { + checkIsInMulti(); + client.pubsubNumSub(channels); + return BuilderFactory.STRING_MAP + .build(client.getBinaryMultiBulkReply()); + } } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 2ba157c..793b58f 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -44,6 +44,9 @@ public final class Protocol { public static final String CLUSTER_SETSLOT_NODE = "node"; public static final String CLUSTER_SETSLOT_MIGRATING = "migrating"; public static final String CLUSTER_SETSLOT_IMPORTING = "importing"; + public static final String PUBSUB_CHANNELS= "channels"; + public static final String PUBSUB_NUMSUB = "numsub"; + public static final String PUBSUB_NUM_PAT = "numpat"; private Protocol() { // this prevent the class from instantiation @@ -181,7 +184,7 @@ public final class Protocol { } public static final byte[] toByteArray(final boolean value) { - return toByteArray(value ? 1 : 0); + return toByteArray(value ? 1 : 0); } public static final byte[] toByteArray(final int value) { @@ -197,7 +200,7 @@ public final class Protocol { } public static enum Command { - PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING; + PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING; public final byte[] raw; diff --git a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java index d0d8353..5149ff2 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java @@ -3,6 +3,9 @@ package redis.clients.jedis.tests.commands; import java.io.IOException; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; @@ -61,6 +64,124 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } }, "foo"); } + + + @Test + public void pubSubChannels(){ + final List expectedActiveChannels = Arrays.asList("testchan1", "testchan2", "testchan3"); + jedis.subscribe(new JedisPubSub() { + private int count = 0; + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + count++; + //All channels are subscribed + if (count == 3) { + Jedis otherJedis = createJedis(); + List activeChannels = otherJedis.pubsubChannels("test*"); + assertTrue(expectedActiveChannels.containsAll(activeChannels)); + unsubscribe(); + } + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPMessage(String pattern, String channel, String message) { + } + + @Override + public void onMessage(String channel, String message) { + } + }, "testchan1", "testchan2", "testchan3"); + } + + @Test + public void pubSubNumPat(){ + jedis.psubscribe(new JedisPubSub() { + private int count=0; + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + count++; + if (count == 3) { + Jedis otherJedis = createJedis(); + Long numPatterns = otherJedis.pubsubNumPat(); + assertEquals(new Long(2l), numPatterns); + punsubscribe(); + } + } + + @Override + public void onPMessage(String pattern, String channel, String message) { + } + + @Override + public void onMessage(String channel, String message) { + } + }, "test*", "test*", "chan*"); + } + + @Test + public void pubSubNumSub(){ + final Map expectedNumSub = new HashMap(); + expectedNumSub.put("testchannel2", "1"); + expectedNumSub.put("testchannel1", "1"); + jedis.subscribe(new JedisPubSub() { + private int count=0; + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + count++; + if (count == 2) { + Jedis otherJedis = createJedis(); + Map numSub = otherJedis.pubsubNumSub("testchannel1", "testchannel2"); + assertEquals(expectedNumSub, numSub); + unsubscribe(); + } + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPMessage(String pattern, String channel, String message) { + } + + @Override + public void onMessage(String channel, String message) { + } + }, "testchannel1", "testchannel2"); + } @Test public void subscribeMany() throws UnknownHostException, IOException,