From f11c1622de1d529cad509b7793977714229f4883 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Thu, 16 Jan 2014 21:57:22 -0700 Subject: [PATCH] Allow safe multi-threaded access to JedisPubSub If Thread A calls a subscribe method on Jedis it will block on a socket read call waiting for messages or subscription notifications. Thread B is now free to call additional methods on JedisPubSub to change the current subscriptions that thread A is waiting for. Essentially Thread A will do reads on the socket and Thread B will do writes. An issue occurs in that while Thread A is doing reads, in the getObjectMultiBulkReply() method there is an implicit flush() call. This means both Thread A and Thread B may do a write to the socket. Under this situation if Thread A does a flush while Thread B is writing the internal buffer will be corrupted. The fix is to make thread A never call flush(). This allows Thread A to be solely reads and Thread B to be solely writes. Additionally since Thread B is sending commands, the internal pipeline count is incremented and never decremented. So when Thread A terminates it's read it resets the pipeline count. --- .../java/redis/clients/jedis/Connection.java | 10 +++++- .../java/redis/clients/jedis/JedisPubSub.java | 34 +++++++++++++++++-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 1c42b94..9d4c762 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -202,11 +202,19 @@ public class Connection { return (List) Protocol.read(inputStream); } + public void resetPipelinedCount() { + pipelinedCommands = 0; + } + @SuppressWarnings("unchecked") + public List getRawObjectMultiBulkReply() { + return (List) Protocol.read(inputStream); + } + public List getObjectMultiBulkReply() { flush(); pipelinedCommands--; - return (List) Protocol.read(inputStream); + return getRawObjectMultiBulkReply(); } @SuppressWarnings("unchecked") diff --git a/src/main/java/redis/clients/jedis/JedisPubSub.java b/src/main/java/redis/clients/jedis/JedisPubSub.java index d07a409..41ae53b 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSub.java +++ b/src/main/java/redis/clients/jedis/JedisPubSub.java @@ -16,7 +16,7 @@ import redis.clients.util.SafeEncoder; public abstract class JedisPubSub { private int subscribedChannels = 0; - private Client client; + private volatile Client client; public abstract void onMessage(String channel, String message); @@ -41,26 +41,46 @@ public abstract class JedisPubSub { } public void unsubscribe(String... channels) { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.unsubscribe(channels); client.flush(); } public void subscribe(String... channels) { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.subscribe(channels); client.flush(); } public void psubscribe(String... patterns) { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.psubscribe(patterns); client.flush(); } public void punsubscribe() { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.punsubscribe(); client.flush(); } public void punsubscribe(String... patterns) { + if (client == null) { + throw new JedisConnectionException( + "JedisPubSub is not subscribed to a Jedis instance."); + } client.punsubscribe(patterns); client.flush(); } @@ -85,7 +105,7 @@ public abstract class JedisPubSub { private void process(Client client) { do { - List reply = client.getObjectMultiBulkReply(); + List reply = client.getRawObjectMultiBulkReply(); final Object firstObj = reply.get(0); if (!(firstObj instanceof byte[])) { throw new JedisException("Unknown message type: " + firstObj); @@ -138,9 +158,17 @@ public abstract class JedisPubSub { throw new JedisException("Unknown message type: " + firstObj); } } while (isSubscribed()); + + /* Invalidate instance since this thread is no longer listening */ + this.client = null; + + /* Reset pipeline count because subscribe() calls would have + * increased it but nothing decremented it. + */ + client.resetPipelinedCount(); } public int getSubscribedChannels() { return subscribedChannels; } -} \ No newline at end of file +}