Merge branch 'pubsub-concurrency' of github.com:ibuildthecloud/jedis into ibuildthecloud-pubsub-concurrency

Conflicts:
	src/main/java/redis/clients/jedis/Connection.java
	src/main/java/redis/clients/jedis/JedisPubSub.java
This commit is contained in:
Jonathan Leibiusky
2014-01-31 13:28:29 -05:00
2 changed files with 44 additions and 6 deletions

View File

@@ -206,11 +206,19 @@ public class Connection {
return (List<byte[]>) Protocol.read(inputStream); return (List<byte[]>) Protocol.read(inputStream);
} }
public void resetPipelinedCount() {
pipelinedCommands = 0;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Object> getRawObjectMultiBulkReply() {
return (List<Object>) Protocol.read(inputStream);
}
public List<Object> getObjectMultiBulkReply() { public List<Object> getObjectMultiBulkReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return (List<Object>) Protocol.read(inputStream); return getRawObjectMultiBulkReply();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@@ -16,7 +16,7 @@ import redis.clients.util.SafeEncoder;
public abstract class JedisPubSub { public abstract class JedisPubSub {
private int subscribedChannels = 0; private int subscribedChannels = 0;
private Client client; private volatile Client client;
public abstract void onMessage(String channel, String message); public abstract void onMessage(String channel, String message);
@@ -41,26 +41,46 @@ public abstract class JedisPubSub {
} }
public void unsubscribe(String... channels) { public void unsubscribe(String... channels) {
if (client == null) {
throw new JedisConnectionException(
"JedisPubSub is not subscribed to a Jedis instance.");
}
client.unsubscribe(channels); client.unsubscribe(channels);
client.flush(); client.flush();
} }
public void subscribe(String... channels) { public void subscribe(String... channels) {
if (client == null) {
throw new JedisConnectionException(
"JedisPubSub is not subscribed to a Jedis instance.");
}
client.subscribe(channels); client.subscribe(channels);
client.flush(); client.flush();
} }
public void psubscribe(String... patterns) { public void psubscribe(String... patterns) {
if (client == null) {
throw new JedisConnectionException(
"JedisPubSub is not subscribed to a Jedis instance.");
}
client.psubscribe(patterns); client.psubscribe(patterns);
client.flush(); client.flush();
} }
public void punsubscribe() { public void punsubscribe() {
if (client == null) {
throw new JedisConnectionException(
"JedisPubSub is not subscribed to a Jedis instance.");
}
client.punsubscribe(); client.punsubscribe();
client.flush(); client.flush();
} }
public void punsubscribe(String... patterns) { public void punsubscribe(String... patterns) {
if (client == null) {
throw new JedisConnectionException(
"JedisPubSub is not subscribed to a Jedis instance.");
}
client.punsubscribe(patterns); client.punsubscribe(patterns);
client.flush(); client.flush();
} }
@@ -84,8 +104,9 @@ public abstract class JedisPubSub {
} }
private void process(Client client) { private void process(Client client) {
do { do {
List<Object> reply = client.getObjectMultiBulkReply(); List<Object> reply = client.getRawObjectMultiBulkReply();
final Object firstObj = reply.get(0); final Object firstObj = reply.get(0);
if (!(firstObj instanceof byte[])) { if (!(firstObj instanceof byte[])) {
throw new JedisException("Unknown message type: " + firstObj); throw new JedisException("Unknown message type: " + firstObj);
@@ -138,6 +159,15 @@ public abstract class JedisPubSub {
throw new JedisException("Unknown message type: " + firstObj); throw new JedisException("Unknown message type: " + firstObj);
} }
} while (isSubscribed()); } while (isSubscribed());
/* Invalidate instance since this thread is no longer listening */
this.client = null;
/*
* Reset pipeline count because subscribe() calls would have increased
* it but nothing decremented it.
*/
client.resetPipelinedCount();
} }
public int getSubscribedChannels() { public int getSubscribedChannels() {