now it is possible to subscribe and unsubscribe on a JedisPubSub instance

This commit is contained in:
Jonathan Leibiusky
2010-11-02 23:18:24 -03:00
parent 7a377fd8e9
commit 08f8432215
2 changed files with 241 additions and 178 deletions

View File

@@ -9,7 +9,7 @@ public abstract class JedisPubSub {
public abstract void onMessage(String channel, String message); public abstract void onMessage(String channel, String message);
public abstract void onPMessage(String pattern, String channel, public abstract void onPMessage(String pattern, String channel,
String message); String message);
public abstract void onSubscribe(String channel, int subscribedChannels); public abstract void onSubscribe(String channel, int subscribedChannels);
@@ -19,66 +19,74 @@ public abstract class JedisPubSub {
public abstract void onPSubscribe(String pattern, int subscribedChannels); public abstract void onPSubscribe(String pattern, int subscribedChannels);
protected void unsubscribe() { public void unsubscribe() {
client.unsubscribe(); client.unsubscribe();
} }
protected void unsubscribe(String... channels) { public void unsubscribe(String... channels) {
client.unsubscribe(channels); client.unsubscribe(channels);
} }
protected void subscribe(String... channels) { public void subscribe(String... channels) {
client.subscribe(channels); client.subscribe(channels);
}
public void psubscribe(String... patterns) {
client.psubscribe(patterns);
}
public void punsubscribe() {
client.punsubscribe();
}
public void punsubscribe(String... patterns) {
client.punsubscribe(patterns);
} }
public boolean isSubscribed() { public boolean isSubscribed() {
return subscribedChannels > 0; return subscribedChannels > 0;
} }
public void proceedWithPatterns(Client client, String... patterns) { public void proceedWithPatterns(Client client, String... patterns) {
this.client = client; this.client = client;
client.psubscribe(patterns); client.psubscribe(patterns);
process(client); process(client);
} }
public void proceed(Client client, String... channels) { public void proceed(Client client, String... channels) {
this.client = client; this.client = client;
client.subscribe(channels); client.subscribe(channels);
process(client); process(client);
} }
private void process(Client client) { private void process(Client client) {
do { do {
List<Object> reply = client.getObjectMultiBulkReply(); List<Object> reply = client.getObjectMultiBulkReply();
if (reply.get(0).equals("subscribe")) { if (reply.get(0).equals("subscribe")) {
subscribedChannels = ((Integer) reply.get(2)).intValue(); subscribedChannels = ((Integer) reply.get(2)).intValue();
onSubscribe((String) reply.get(1), subscribedChannels); onSubscribe((String) reply.get(1), subscribedChannels);
} else if (reply.get(0).equals("unsubscribe")) { } else if (reply.get(0).equals("unsubscribe")) {
subscribedChannels = ((Integer) reply.get(2)).intValue(); subscribedChannels = ((Integer) reply.get(2)).intValue();
onUnsubscribe((String) reply.get(1), subscribedChannels); onUnsubscribe((String) reply.get(1), subscribedChannels);
} else if (reply.get(0).equals("message")) { } else if (reply.get(0).equals("message")) {
onMessage((String) reply.get(1), (String) reply.get(2)); onMessage((String) reply.get(1), (String) reply.get(2));
} else if (reply.get(0).equals("pmessage")) { } else if (reply.get(0).equals("pmessage")) {
onPMessage((String) reply.get(1), (String) reply.get(2), onPMessage((String) reply.get(1), (String) reply.get(2),
(String) reply.get(3)); (String) reply.get(3));
} else if (reply.get(0).equals("psubscribe")) { } else if (reply.get(0).equals("psubscribe")) {
subscribedChannels = ((Integer) reply.get(2)).intValue(); subscribedChannels = ((Integer) reply.get(2)).intValue();
onPSubscribe((String) reply.get(1), subscribedChannels); onPSubscribe((String) reply.get(1), subscribedChannels);
} else if (reply.get(0).equals("punsubscribe")) { } else if (reply.get(0).equals("punsubscribe")) {
subscribedChannels = ((Integer) reply.get(2)).intValue(); subscribedChannels = ((Integer) reply.get(2)).intValue();
onPUnsubscribe((String) reply.get(1), subscribedChannels); onPUnsubscribe((String) reply.get(1), subscribedChannels);
} else { } else {
throw new JedisException("Unknown message type: " throw new JedisException("Unknown message type: "
+ reply.get(0)); + reply.get(0));
} }
} while (isSubscribed()); } while (isSubscribed());
} }
protected void punsubscribe() { public int getSubscribedChannels() {
client.punsubscribe(); return subscribedChannels;
} }
}
protected void punsubscribe(String... patterns) {
client.punsubscribe(patterns);
}
}

View File

@@ -11,170 +11,225 @@ import redis.clients.jedis.JedisPubSub;
public class PublishSubscribeCommandsTest extends JedisCommandTestBase { public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void subscribe() throws UnknownHostException, IOException, public void subscribe() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
public void run() { public void run() {
try { try {
Jedis j = createJedis(); Jedis j = createJedis();
Thread.sleep(1000); Thread.sleep(1000);
j.publish("foo", "exit"); j.publish("foo", "exit");
j.disconnect(); j.disconnect();
} catch (Exception ex) { } catch (Exception ex) {
fail(ex.getMessage()); fail(ex.getMessage());
} }
} }
}).start(); });
jedis.subscribe(new JedisPubSub() { t.start();
public void onMessage(String channel, String message) { jedis.subscribe(new JedisPubSub() {
assertEquals("foo", channel); public void onMessage(String channel, String message) {
assertEquals("exit", message); assertEquals("foo", channel);
unsubscribe(); assertEquals("exit", message);
} unsubscribe();
}
public void onSubscribe(String channel, int subscribedChannels) { public void onSubscribe(String channel, int subscribedChannels) {
assertEquals("foo", channel); assertEquals("foo", channel);
assertEquals(1, subscribedChannels); assertEquals(1, subscribedChannels);
} }
public void onUnsubscribe(String channel, int subscribedChannels) { public void onUnsubscribe(String channel, int subscribedChannels) {
assertEquals("foo", channel); assertEquals("foo", channel);
assertEquals(0, subscribedChannels); assertEquals(0, subscribedChannels);
} }
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
} }
public void onPUnsubscribe(String pattern, int subscribedChannels) { public void onPUnsubscribe(String pattern, int subscribedChannels) {
} }
public void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel,
String message) { String message) {
} }
}, "foo"); }, "foo");
t.join();
} }
@Test @Test
public void subscribeMany() throws UnknownHostException, IOException, public void subscribeMany() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
public void run() { public void run() {
try { try {
Jedis j = createJedis(); Jedis j = createJedis();
Thread.sleep(1000); Thread.sleep(1000);
j.publish("foo", "exit"); j.publish("foo", "exit");
Thread.sleep(1000); Thread.sleep(1000);
j.publish("bar", "exit"); j.publish("bar", "exit");
j.disconnect(); j.disconnect();
} catch (Exception ex) { } catch (Exception ex) {
fail(ex.getMessage()); fail(ex.getMessage());
} }
} }
}).start(); });
jedis.subscribe(new JedisPubSub() { t.start();
public void onMessage(String channel, String message) { jedis.subscribe(new JedisPubSub() {
unsubscribe(channel); public void onMessage(String channel, String message) {
} unsubscribe(channel);
}
public void onSubscribe(String channel, int subscribedChannels) { public void onSubscribe(String channel, int subscribedChannels) {
} }
public void onUnsubscribe(String channel, int subscribedChannels) { public void onUnsubscribe(String channel, int subscribedChannels) {
} }
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
} }
public void onPUnsubscribe(String pattern, int subscribedChannels) { public void onPUnsubscribe(String pattern, int subscribedChannels) {
} }
public void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel,
String message) { String message) {
} }
}, "foo", "bar"); }, "foo", "bar");
t.join();
} }
@Test @Test
public void psubscribe() throws UnknownHostException, IOException, public void psubscribe() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
public void run() { public void run() {
try { try {
Jedis j = createJedis(); Jedis j = createJedis();
Thread.sleep(1000); Thread.sleep(1000);
j.publish("foo.bar", "exit"); j.publish("foo.bar", "exit");
j.disconnect(); j.disconnect();
} catch (Exception ex) { } catch (Exception ex) {
fail(ex.getMessage()); fail(ex.getMessage());
} }
} }
}).start(); });
jedis.psubscribe(new JedisPubSub() { t.start();
public void onMessage(String channel, String message) { jedis.psubscribe(new JedisPubSub() {
} public void onMessage(String channel, String message) {
}
public void onSubscribe(String channel, int subscribedChannels) { public void onSubscribe(String channel, int subscribedChannels) {
} }
public void onUnsubscribe(String channel, int subscribedChannels) { public void onUnsubscribe(String channel, int subscribedChannels) {
} }
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
assertEquals("foo.*", pattern); assertEquals("foo.*", pattern);
assertEquals(1, subscribedChannels); assertEquals(1, subscribedChannels);
} }
public void onPUnsubscribe(String pattern, int subscribedChannels) { public void onPUnsubscribe(String pattern, int subscribedChannels) {
assertEquals("foo.*", pattern); assertEquals("foo.*", pattern);
assertEquals(0, subscribedChannels); assertEquals(0, subscribedChannels);
} }
public void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel,
String message) { String message) {
assertEquals("foo.*", pattern); assertEquals("foo.*", pattern);
assertEquals("foo.bar", channel); assertEquals("foo.bar", channel);
assertEquals("exit", message); assertEquals("exit", message);
punsubscribe(); punsubscribe();
} }
}, "foo.*"); }, "foo.*");
t.join();
} }
@Test @Test
public void psubscribeMany() throws UnknownHostException, IOException, public void psubscribeMany() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
public void run() { public void run() {
try { try {
Jedis j = createJedis(); Jedis j = createJedis();
Thread.sleep(1000); Thread.sleep(1000);
j.publish("foo.123", "exit"); j.publish("foo.123", "exit");
Thread.sleep(1000); Thread.sleep(1000);
j.publish("bar.123", "exit"); j.publish("bar.123", "exit");
j.disconnect(); j.disconnect();
} catch (Exception ex) { } catch (Exception ex) {
fail(ex.getMessage()); fail(ex.getMessage());
} }
} }
}).start(); });
jedis.psubscribe(new JedisPubSub() { t.start();
public void onMessage(String channel, String message) { jedis.psubscribe(new JedisPubSub() {
} public void onMessage(String channel, String message) {
}
public void onSubscribe(String channel, int subscribedChannels) { public void onSubscribe(String channel, int subscribedChannels) {
} }
public void onUnsubscribe(String channel, int subscribedChannels) { public void onUnsubscribe(String channel, int subscribedChannels) {
} }
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
} }
public void onPUnsubscribe(String pattern, int subscribedChannels) { public void onPUnsubscribe(String pattern, int subscribedChannels) {
} }
public void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel,
String message) { String message) {
punsubscribe(pattern); punsubscribe(pattern);
} }
}, "foo.*", "bar.*"); }, "foo.*", "bar.*");
t.join();
}
@Test
public void subscribeLazily() throws UnknownHostException, IOException,
InterruptedException {
final JedisPubSub pubsub = new JedisPubSub() {
public void onMessage(String channel, String message) {
unsubscribe(channel);
}
public void onSubscribe(String channel, int subscribedChannels) {
}
public void onUnsubscribe(String channel, int subscribedChannels) {
}
public void onPSubscribe(String pattern, int subscribedChannels) {
}
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
public void onPMessage(String pattern, String channel,
String message) {
punsubscribe(pattern);
}
};
Thread t = new Thread(new Runnable() {
public void run() {
try {
Jedis j = createJedis();
Thread.sleep(1000);
pubsub.subscribe("bar");
pubsub.psubscribe("bar.*");
j.publish("foo", "exit");
j.publish("bar", "exit");
j.publish("bar.123", "exit");
j.disconnect();
} catch (Exception ex) {
fail(ex.getMessage());
}
}
});
t.start();
jedis.subscribe(pubsub, "foo");
t.join();
} }
} }