From 08f843221528ab7fb92ef43469cfe2406c1eefb1 Mon Sep 17 00:00:00 2001 From: Jonathan Leibiusky Date: Tue, 2 Nov 2010 23:18:24 -0300 Subject: [PATCH] now it is possible to subscribe and unsubscribe on a JedisPubSub instance --- .../java/redis/clients/jedis/JedisPubSub.java | 98 +++--- .../PublishSubscribeCommandsTest.java | 321 ++++++++++-------- 2 files changed, 241 insertions(+), 178 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisPubSub.java b/src/main/java/redis/clients/jedis/JedisPubSub.java index 31d1f5e..ac6918d 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSub.java +++ b/src/main/java/redis/clients/jedis/JedisPubSub.java @@ -9,7 +9,7 @@ public abstract class JedisPubSub { public abstract void onMessage(String channel, String message); public abstract void onPMessage(String pattern, String channel, - String message); + String message); public abstract void onSubscribe(String channel, int subscribedChannels); @@ -19,66 +19,74 @@ public abstract class JedisPubSub { public abstract void onPSubscribe(String pattern, int subscribedChannels); - protected void unsubscribe() { - client.unsubscribe(); + public void unsubscribe() { + client.unsubscribe(); } - protected void unsubscribe(String... channels) { - client.unsubscribe(channels); + public void unsubscribe(String... channels) { + client.unsubscribe(channels); } - protected void subscribe(String... channels) { - client.subscribe(channels); + public void subscribe(String... channels) { + client.subscribe(channels); + } + + public void psubscribe(String... patterns) { + client.psubscribe(patterns); + } + + public void punsubscribe() { + client.punsubscribe(); + } + + public void punsubscribe(String... patterns) { + client.punsubscribe(patterns); } public boolean isSubscribed() { - return subscribedChannels > 0; + return subscribedChannels > 0; } public void proceedWithPatterns(Client client, String... patterns) { - this.client = client; - client.psubscribe(patterns); - process(client); + this.client = client; + client.psubscribe(patterns); + process(client); } public void proceed(Client client, String... channels) { - this.client = client; - client.subscribe(channels); - process(client); + this.client = client; + client.subscribe(channels); + process(client); } private void process(Client client) { - do { - List reply = client.getObjectMultiBulkReply(); - if (reply.get(0).equals("subscribe")) { - subscribedChannels = ((Integer) reply.get(2)).intValue(); - onSubscribe((String) reply.get(1), subscribedChannels); - } else if (reply.get(0).equals("unsubscribe")) { - subscribedChannels = ((Integer) reply.get(2)).intValue(); - onUnsubscribe((String) reply.get(1), subscribedChannels); - } else if (reply.get(0).equals("message")) { - onMessage((String) reply.get(1), (String) reply.get(2)); - } else if (reply.get(0).equals("pmessage")) { - onPMessage((String) reply.get(1), (String) reply.get(2), - (String) reply.get(3)); - } else if (reply.get(0).equals("psubscribe")) { - subscribedChannels = ((Integer) reply.get(2)).intValue(); - onPSubscribe((String) reply.get(1), subscribedChannels); - } else if (reply.get(0).equals("punsubscribe")) { - subscribedChannels = ((Integer) reply.get(2)).intValue(); - onPUnsubscribe((String) reply.get(1), subscribedChannels); - } else { - throw new JedisException("Unknown message type: " - + reply.get(0)); - } - } while (isSubscribed()); + do { + List reply = client.getObjectMultiBulkReply(); + if (reply.get(0).equals("subscribe")) { + subscribedChannels = ((Integer) reply.get(2)).intValue(); + onSubscribe((String) reply.get(1), subscribedChannels); + } else if (reply.get(0).equals("unsubscribe")) { + subscribedChannels = ((Integer) reply.get(2)).intValue(); + onUnsubscribe((String) reply.get(1), subscribedChannels); + } else if (reply.get(0).equals("message")) { + onMessage((String) reply.get(1), (String) reply.get(2)); + } else if (reply.get(0).equals("pmessage")) { + onPMessage((String) reply.get(1), (String) reply.get(2), + (String) reply.get(3)); + } else if (reply.get(0).equals("psubscribe")) { + subscribedChannels = ((Integer) reply.get(2)).intValue(); + onPSubscribe((String) reply.get(1), subscribedChannels); + } else if (reply.get(0).equals("punsubscribe")) { + subscribedChannels = ((Integer) reply.get(2)).intValue(); + onPUnsubscribe((String) reply.get(1), subscribedChannels); + } else { + throw new JedisException("Unknown message type: " + + reply.get(0)); + } + } while (isSubscribed()); } - protected void punsubscribe() { - client.punsubscribe(); + public int getSubscribedChannels() { + return subscribedChannels; } - - protected void punsubscribe(String... patterns) { - client.punsubscribe(patterns); - } -} +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java index 38b3c8c..db30dfa 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java @@ -11,170 +11,225 @@ import redis.clients.jedis.JedisPubSub; public class PublishSubscribeCommandsTest extends JedisCommandTestBase { @Test public void subscribe() throws UnknownHostException, IOException, - InterruptedException { - new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }).start(); - jedis.subscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - assertEquals("foo", channel); - assertEquals("exit", message); - unsubscribe(); - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish("foo", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + assertEquals("foo", channel); + assertEquals("exit", message); + unsubscribe(); + } - public void onSubscribe(String channel, int subscribedChannels) { - assertEquals("foo", channel); - assertEquals(1, subscribedChannels); - } + public void onSubscribe(String channel, int subscribedChannels) { + assertEquals("foo", channel); + assertEquals(1, subscribedChannels); + } - public void onUnsubscribe(String channel, int subscribedChannels) { - assertEquals("foo", channel); - assertEquals(0, subscribedChannels); - } + public void onUnsubscribe(String channel, int subscribedChannels) { + assertEquals("foo", channel); + assertEquals(0, subscribedChannels); + } - public void onPSubscribe(String pattern, int subscribedChannels) { - } + public void onPSubscribe(String pattern, int subscribedChannels) { + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } - public void onPMessage(String pattern, String channel, - String message) { - } - }, "foo"); + public void onPMessage(String pattern, String channel, + String message) { + } + }, "foo"); + t.join(); } @Test public void subscribeMany() throws UnknownHostException, IOException, - InterruptedException { - new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo", "exit"); - Thread.sleep(1000); - j.publish("bar", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }).start(); - jedis.subscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - unsubscribe(channel); - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish("foo", "exit"); + Thread.sleep(1000); + j.publish("bar", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + unsubscribe(channel); + } - public void onSubscribe(String channel, int subscribedChannels) { - } + public void onSubscribe(String channel, int subscribedChannels) { + } - public void onUnsubscribe(String channel, int subscribedChannels) { - } + public void onUnsubscribe(String channel, int subscribedChannels) { + } - public void onPSubscribe(String pattern, int subscribedChannels) { - } + public void onPSubscribe(String pattern, int subscribedChannels) { + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } - public void onPMessage(String pattern, String channel, - String message) { - } - }, "foo", "bar"); + public void onPMessage(String pattern, String channel, + String message) { + } + }, "foo", "bar"); + t.join(); } @Test public void psubscribe() throws UnknownHostException, IOException, - InterruptedException { - new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo.bar", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }).start(); - jedis.psubscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish("foo.bar", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.psubscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + } - public void onSubscribe(String channel, int subscribedChannels) { - } + public void onSubscribe(String channel, int subscribedChannels) { + } - public void onUnsubscribe(String channel, int subscribedChannels) { - } + public void onUnsubscribe(String channel, int subscribedChannels) { + } - public void onPSubscribe(String pattern, int subscribedChannels) { - assertEquals("foo.*", pattern); - assertEquals(1, subscribedChannels); - } + public void onPSubscribe(String pattern, int subscribedChannels) { + assertEquals("foo.*", pattern); + assertEquals(1, subscribedChannels); + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - assertEquals("foo.*", pattern); - assertEquals(0, subscribedChannels); - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + assertEquals("foo.*", pattern); + assertEquals(0, subscribedChannels); + } - public void onPMessage(String pattern, String channel, - String message) { - assertEquals("foo.*", pattern); - assertEquals("foo.bar", channel); - assertEquals("exit", message); - punsubscribe(); - } - }, "foo.*"); + public void onPMessage(String pattern, String channel, + String message) { + assertEquals("foo.*", pattern); + assertEquals("foo.bar", channel); + assertEquals("exit", message); + punsubscribe(); + } + }, "foo.*"); + t.join(); } @Test public void psubscribeMany() throws UnknownHostException, IOException, - InterruptedException { - new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo.123", "exit"); - Thread.sleep(1000); - j.publish("bar.123", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }).start(); - jedis.psubscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish("foo.123", "exit"); + Thread.sleep(1000); + j.publish("bar.123", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.psubscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + } - public void onSubscribe(String channel, int subscribedChannels) { - } + public void onSubscribe(String channel, int subscribedChannels) { + } - public void onUnsubscribe(String channel, int subscribedChannels) { - } + public void onUnsubscribe(String channel, int subscribedChannels) { + } - public void onPSubscribe(String pattern, int subscribedChannels) { - } + public void onPSubscribe(String pattern, int subscribedChannels) { + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } - public void onPMessage(String pattern, String channel, - String message) { - punsubscribe(pattern); - } - }, "foo.*", "bar.*"); + public void onPMessage(String pattern, String channel, + String message) { + punsubscribe(pattern); + } + }, "foo.*", "bar.*"); + t.join(); + } + + @Test + public void subscribeLazily() throws UnknownHostException, IOException, + InterruptedException { + final JedisPubSub pubsub = new JedisPubSub() { + public void onMessage(String channel, String message) { + unsubscribe(channel); + } + + public void onSubscribe(String channel, int subscribedChannels) { + } + + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + public void onPSubscribe(String pattern, int subscribedChannels) { + } + + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } + + public void onPMessage(String pattern, String channel, + String message) { + punsubscribe(pattern); + } + }; + + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + pubsub.subscribe("bar"); + pubsub.psubscribe("bar.*"); + j.publish("foo", "exit"); + j.publish("bar", "exit"); + j.publish("bar.123", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(pubsub, "foo"); + t.join(); } } \ No newline at end of file