Allow safe multi-threaded access to JedisPubSub
If Thread A calls a subscribe method on Jedis it will block on a socket read call waiting for messages or subscription notifications. Thread B is now free to call additional methods on JedisPubSub to change the current subscriptions that thread A is waiting for. Essentially Thread A will do reads on the socket and Thread B will do writes. An issue occurs in that while Thread A is doing reads, in the getObjectMultiBulkReply() method there is an implicit flush() call. This means both Thread A and Thread B may do a write to the socket. Under this situation if Thread A does a flush while Thread B is writing the internal buffer will be corrupted. The fix is to make thread A never call flush(). This allows Thread A to be solely reads and Thread B to be solely writes. Additionally since Thread B is sending commands, the internal pipeline count is incremented and never decremented. So when Thread A terminates it's read it resets the pipeline count.
This commit is contained in:
@@ -202,11 +202,19 @@ public class Connection {
|
||||
return (List<byte[]>) Protocol.read(inputStream);
|
||||
}
|
||||
|
||||
public void resetPipelinedCount() {
|
||||
pipelinedCommands = 0;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<Object> getRawObjectMultiBulkReply() {
|
||||
return (List<Object>) Protocol.read(inputStream);
|
||||
}
|
||||
|
||||
public List<Object> getObjectMultiBulkReply() {
|
||||
flush();
|
||||
pipelinedCommands--;
|
||||
return (List<Object>) Protocol.read(inputStream);
|
||||
return getRawObjectMultiBulkReply();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -16,7 +16,7 @@ import redis.clients.util.SafeEncoder;
|
||||
|
||||
public abstract class JedisPubSub {
|
||||
private int subscribedChannels = 0;
|
||||
private Client client;
|
||||
private volatile Client client;
|
||||
|
||||
public abstract void onMessage(String channel, String message);
|
||||
|
||||
@@ -41,26 +41,46 @@ public abstract class JedisPubSub {
|
||||
}
|
||||
|
||||
public void unsubscribe(String... channels) {
|
||||
if (client == null) {
|
||||
throw new JedisConnectionException(
|
||||
"JedisPubSub is not subscribed to a Jedis instance.");
|
||||
}
|
||||
client.unsubscribe(channels);
|
||||
client.flush();
|
||||
}
|
||||
|
||||
public void subscribe(String... channels) {
|
||||
if (client == null) {
|
||||
throw new JedisConnectionException(
|
||||
"JedisPubSub is not subscribed to a Jedis instance.");
|
||||
}
|
||||
client.subscribe(channels);
|
||||
client.flush();
|
||||
}
|
||||
|
||||
public void psubscribe(String... patterns) {
|
||||
if (client == null) {
|
||||
throw new JedisConnectionException(
|
||||
"JedisPubSub is not subscribed to a Jedis instance.");
|
||||
}
|
||||
client.psubscribe(patterns);
|
||||
client.flush();
|
||||
}
|
||||
|
||||
public void punsubscribe() {
|
||||
if (client == null) {
|
||||
throw new JedisConnectionException(
|
||||
"JedisPubSub is not subscribed to a Jedis instance.");
|
||||
}
|
||||
client.punsubscribe();
|
||||
client.flush();
|
||||
}
|
||||
|
||||
public void punsubscribe(String... patterns) {
|
||||
if (client == null) {
|
||||
throw new JedisConnectionException(
|
||||
"JedisPubSub is not subscribed to a Jedis instance.");
|
||||
}
|
||||
client.punsubscribe(patterns);
|
||||
client.flush();
|
||||
}
|
||||
@@ -85,7 +105,7 @@ public abstract class JedisPubSub {
|
||||
|
||||
private void process(Client client) {
|
||||
do {
|
||||
List<Object> reply = client.getObjectMultiBulkReply();
|
||||
List<Object> reply = client.getRawObjectMultiBulkReply();
|
||||
final Object firstObj = reply.get(0);
|
||||
if (!(firstObj instanceof byte[])) {
|
||||
throw new JedisException("Unknown message type: " + firstObj);
|
||||
@@ -138,9 +158,17 @@ public abstract class JedisPubSub {
|
||||
throw new JedisException("Unknown message type: " + firstObj);
|
||||
}
|
||||
} while (isSubscribed());
|
||||
|
||||
/* Invalidate instance since this thread is no longer listening */
|
||||
this.client = null;
|
||||
|
||||
/* Reset pipeline count because subscribe() calls would have
|
||||
* increased it but nothing decremented it.
|
||||
*/
|
||||
client.resetPipelinedCount();
|
||||
}
|
||||
|
||||
public int getSubscribedChannels() {
|
||||
return subscribedChannels;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user