diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 50d7b97..74bcc09 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -206,11 +206,19 @@ public class Connection { return (List) Protocol.read(inputStream); } + public void resetPipelinedCount() { + pipelinedCommands = 0; + } + @SuppressWarnings("unchecked") + public List getRawObjectMultiBulkReply() { + return (List) Protocol.read(inputStream); + } + public List getObjectMultiBulkReply() { - flush(); - pipelinedCommands--; - return (List) Protocol.read(inputStream); + flush(); + pipelinedCommands--; + return getRawObjectMultiBulkReply(); } @SuppressWarnings("unchecked") diff --git a/src/main/java/redis/clients/jedis/JedisPubSub.java b/src/main/java/redis/clients/jedis/JedisPubSub.java index 1f3ac5f..7dfcf07 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSub.java +++ b/src/main/java/redis/clients/jedis/JedisPubSub.java @@ -16,7 +16,7 @@ import redis.clients.util.SafeEncoder; public abstract class JedisPubSub { private int subscribedChannels = 0; - private Client client; + private volatile Client client; public abstract void onMessage(String channel, String message); @@ -41,26 +41,46 @@ public abstract class JedisPubSub { } public void unsubscribe(String... channels) { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.unsubscribe(channels); client.flush(); } public void subscribe(String... channels) { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.subscribe(channels); client.flush(); } public void psubscribe(String... patterns) { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.psubscribe(patterns); client.flush(); } public void punsubscribe() { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.punsubscribe(); client.flush(); } public void punsubscribe(String... patterns) { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.punsubscribe(patterns); client.flush(); } @@ -84,8 +104,9 @@ public abstract class JedisPubSub { } private void process(Client client) { + do { - List reply = client.getObjectMultiBulkReply(); + List reply = client.getRawObjectMultiBulkReply(); final Object firstObj = reply.get(0); if (!(firstObj instanceof byte[])) { throw new JedisException("Unknown message type: " + firstObj); @@ -138,9 +159,18 @@ public abstract class JedisPubSub { throw new JedisException("Unknown message type: " + firstObj); } } while (isSubscribed()); + + /* Invalidate instance since this thread is no longer listening */ + this.client = null; + + /* + * Reset pipeline count because subscribe() calls would have increased + * it but nothing decremented it. + */ + client.resetPipelinedCount(); } public int getSubscribedChannels() { return subscribedChannels; } -} \ No newline at end of file +}