From 4d5e5a7c5d204fb092e0f847cbe181937d8ec915 Mon Sep 17 00:00:00 2001 From: Jonathan Leibiusky Date: Mon, 24 Jan 2011 13:24:08 -0300 Subject: [PATCH] add binary support for pubsub --- .../redis/clients/jedis/BinaryClient.java | 10 +- .../java/redis/clients/jedis/BinaryJedis.java | 17 ++ .../clients/jedis/BinaryJedisPubSub.java | 112 +++++++ src/main/java/redis/clients/jedis/Client.java | 37 +++ src/main/java/redis/clients/jedis/Jedis.java | 6 +- .../PublishSubscribeCommandsTest.java | 289 ++++++++++++++++-- 6 files changed, 440 insertions(+), 31 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/BinaryJedisPubSub.java diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index f39af94..0dab295 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -441,11 +441,11 @@ public class BinaryClient extends Connection { sendCommand(AUTH, password); } - public void subscribe(final String... channels) { + public void subscribe(final byte[]... channels) { sendCommand(SUBSCRIBE, channels); } - public void publish(final String channel, final String message) { + public void publish(final byte[] channel, final byte[] message) { sendCommand(PUBLISH, channel, message); } @@ -453,11 +453,11 @@ public class BinaryClient extends Connection { sendCommand(UNSUBSCRIBE); } - public void unsubscribe(final String... channels) { + public void unsubscribe(final byte[]... channels) { sendCommand(UNSUBSCRIBE, channels); } - public void psubscribe(final String[] patterns) { + public void psubscribe(final byte[]... patterns) { sendCommand(PSUBSCRIBE, patterns); } @@ -465,7 +465,7 @@ public class BinaryClient extends Connection { sendCommand(PUNSUBSCRIBE); } - public void punsubscribe(final String... patterns) { + public void punsubscribe(final byte[]... patterns) { sendCommand(PUNSUBSCRIBE, patterns); } diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index bff0f4f..e245537 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -2925,4 +2925,21 @@ public class BinaryJedis implements BinaryJedisCommands { client.getbit(key, offset); return client.getIntegerReply(); } + + public Long publish(byte[] channel, byte[] message) { + client.publish(channel, message); + return client.getIntegerReply(); + } + + public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) { + client.setTimeoutInfinite(); + jedisPubSub.proceed(client, channels); + client.rollbackTimeout(); + } + + public void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns) { + client.setTimeoutInfinite(); + jedisPubSub.proceedWithPatterns(client, patterns); + client.rollbackTimeout(); + } } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/BinaryJedisPubSub.java b/src/main/java/redis/clients/jedis/BinaryJedisPubSub.java new file mode 100644 index 0000000..fb3fb8a --- /dev/null +++ b/src/main/java/redis/clients/jedis/BinaryJedisPubSub.java @@ -0,0 +1,112 @@ +package redis.clients.jedis; + +import static redis.clients.jedis.Protocol.Keyword.MESSAGE; +import static redis.clients.jedis.Protocol.Keyword.PMESSAGE; +import static redis.clients.jedis.Protocol.Keyword.PSUBSCRIBE; +import static redis.clients.jedis.Protocol.Keyword.PUNSUBSCRIBE; +import static redis.clients.jedis.Protocol.Keyword.SUBSCRIBE; +import static redis.clients.jedis.Protocol.Keyword.UNSUBSCRIBE; + +import java.util.Arrays; +import java.util.List; + +public abstract class BinaryJedisPubSub { + private int subscribedChannels = 0; + private Client client; + + public abstract void onMessage(byte[] channel, byte[] message); + + public abstract void onPMessage(byte[] pattern, byte[] channel, + byte[] message); + + public abstract void onSubscribe(byte[] channel, int subscribedChannels); + + public abstract void onUnsubscribe(byte[] channel, int subscribedChannels); + + public abstract void onPUnsubscribe(byte[] pattern, int subscribedChannels); + + public abstract void onPSubscribe(byte[] pattern, int subscribedChannels); + + public void unsubscribe() { + client.unsubscribe(); + } + + public void unsubscribe(byte[]... channels) { + client.unsubscribe(channels); + } + + public void subscribe(byte[]... channels) { + client.subscribe(channels); + } + + public void psubscribe(byte[]... patterns) { + client.psubscribe(patterns); + } + + public void punsubscribe() { + client.punsubscribe(); + } + + public void punsubscribe(byte[]... patterns) { + client.punsubscribe(patterns); + } + + public boolean isSubscribed() { + return subscribedChannels > 0; + } + + public void proceedWithPatterns(Client client, byte[]... patterns) { + this.client = client; + client.psubscribe(patterns); + process(client); + } + + public void proceed(Client client, byte[]... channels) { + this.client = client; + client.subscribe(channels); + process(client); + } + + private void process(Client client) { + do { + List reply = client.getObjectMultiBulkReply(); + final Object firstObj = reply.get(0); + if (!(firstObj instanceof byte[])) { + throw new JedisException("Unknown message type: " + firstObj); + } + final byte[] resp = (byte[]) firstObj; + if (Arrays.equals(SUBSCRIBE.raw, resp)) { + subscribedChannels = ((Long) reply.get(2)).intValue(); + final byte[] bchannel = (byte[]) reply.get(1); + onSubscribe(bchannel, subscribedChannels); + } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { + subscribedChannels = ((Long) reply.get(2)).intValue(); + final byte[] bchannel = (byte[]) reply.get(1); + onUnsubscribe(bchannel, subscribedChannels); + } else if (Arrays.equals(MESSAGE.raw, resp)) { + final byte[] bchannel = (byte[]) reply.get(1); + final byte[] bmesg = (byte[]) reply.get(2); + onMessage(bchannel, bmesg); + } else if (Arrays.equals(PMESSAGE.raw, resp)) { + final byte[] bpattern = (byte[]) reply.get(1); + final byte[] bchannel = (byte[]) reply.get(2); + final byte[] bmesg = (byte[]) reply.get(3); + onPMessage(bpattern, bchannel, bmesg); + } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { + subscribedChannels = ((Long) reply.get(2)).intValue(); + final byte[] bpattern = (byte[]) reply.get(1); + onPSubscribe(bpattern, subscribedChannels); + } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { + subscribedChannels = ((Long) reply.get(2)).intValue(); + final byte[] bpattern = (byte[]) reply.get(1); + onPUnsubscribe(bpattern, subscribedChannels); + } else { + throw new JedisException("Unknown message type: " + firstObj); + } + } while (isSubscribed()); + } + + public int getSubscribedChannels() { + return subscribedChannels; + } +} \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index 314b093..f7fef38 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -514,4 +514,41 @@ public class Client extends BinaryClient implements Commands { public void getbit(String key, long offset) { getbit(SafeEncoder.encode(key), offset); } + + public void publish(final String channel, final String message) { + publish(SafeEncoder.encode(channel), SafeEncoder.encode(message)); + } + + public void unsubscribe(final String... channels) { + final byte[][] cs = new byte[channels.length][]; + for (int i = 0; i < cs.length; i++) { + cs[i] = SafeEncoder.encode(channels[i]); + } + unsubscribe(cs); + } + + public void psubscribe(final String... patterns) { + final byte[][] ps = new byte[patterns.length][]; + for (int i = 0; i < ps.length; i++) { + ps[i] = SafeEncoder.encode(patterns[i]); + } + psubscribe(ps); + } + + public void punsubscribe(final String... patterns) { + final byte[][] ps = new byte[patterns.length][]; + for (int i = 0; i < ps.length; i++) { + ps[i] = SafeEncoder.encode(patterns[i]); + } + punsubscribe(ps); + } + + public void subscribe(final String... channels) { + final byte[][] cs = new byte[channels.length][]; + for (int i = 0; i < cs.length; i++) { + cs[i] = SafeEncoder.encode(channels[i]); + } + subscribe(cs); + } + } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index d0b56e1..c2e4b42 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -2023,21 +2023,21 @@ public class Jedis extends BinaryJedis implements JedisCommands { } public void subscribe(JedisPubSub jedisPubSub, String... channels) { - runChecks(); + runChecks(); client.setTimeoutInfinite(); jedisPubSub.proceed(client, channels); client.rollbackTimeout(); } public Long publish(String channel, String message) { - runChecks(); + runChecks(); client.publish(channel, message); return client.getIntegerReply(); } public void psubscribe(JedisPubSub jedisPubSub, String... patterns) { runChecks(); - client.setTimeoutInfinite(); + client.setTimeoutInfinite(); jedisPubSub.proceedWithPatterns(client, patterns); client.rollbackTimeout(); } diff --git a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java index c84cf4e..f2e9ee0 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java @@ -2,12 +2,15 @@ package redis.clients.jedis.tests.commands; import java.io.IOException; import java.net.UnknownHostException; +import java.util.Arrays; import org.junit.Test; +import redis.clients.jedis.BinaryJedisPubSub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisException; import redis.clients.jedis.JedisPubSub; +import redis.clients.util.SafeEncoder; public class PublishSubscribeCommandsTest extends JedisCommandTestBase { @Test @@ -233,30 +236,270 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { jedis.subscribe(pubsub, "foo"); t.join(); } - + + @Test + public void binarySubscribe() throws UnknownHostException, IOException, + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("foo"), SafeEncoder + .encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); + assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message)); + unsubscribe(); + } + + public void onSubscribe(byte[] channel, int subscribedChannels) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); + assertEquals(1, subscribedChannels); + } + + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); + assertEquals(0, subscribedChannels); + } + + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + } + + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + } + + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + } + }, SafeEncoder.encode("foo")); + t.join(); + } + + @Test + public void binarySubscribeMany() throws UnknownHostException, IOException, + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("foo"), SafeEncoder + .encode("exit")); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("bar"), SafeEncoder + .encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + unsubscribe(channel); + } + + public void onSubscribe(byte[] channel, int subscribedChannels) { + } + + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + } + + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + } + + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + } + + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + } + }, SafeEncoder.encode("foo"), SafeEncoder.encode("bar")); + t.join(); + } + + @Test + public void binaryPsubscribe() throws UnknownHostException, IOException, + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("foo.bar"), SafeEncoder + .encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.psubscribe(new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + } + + public void onSubscribe(byte[] channel, int subscribedChannels) { + } + + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + } + + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); + assertEquals(1, subscribedChannels); + } + + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); + assertEquals(0, subscribedChannels); + } + + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); + assertTrue(Arrays + .equals(SafeEncoder.encode("foo.bar"), channel)); + assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message)); + punsubscribe(); + } + }, SafeEncoder.encode("foo.*")); + t.join(); + } + + @Test + public void binaryPsubscribeMany() throws UnknownHostException, + IOException, InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("foo.123"), SafeEncoder + .encode("exit")); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("bar.123"), SafeEncoder + .encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.psubscribe(new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + } + + public void onSubscribe(byte[] channel, int subscribedChannels) { + } + + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + } + + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + } + + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + } + + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + punsubscribe(pattern); + } + }, SafeEncoder.encode("foo.*"), SafeEncoder.encode("bar.*")); + t.join(); + } + + @Test + public void binarySubscribeLazily() throws UnknownHostException, + IOException, InterruptedException { + final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + unsubscribe(channel); + } + + public void onSubscribe(byte[] channel, int subscribedChannels) { + } + + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + } + + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + } + + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + } + + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + punsubscribe(pattern); + } + }; + + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + pubsub.subscribe(SafeEncoder.encode("bar")); + pubsub.psubscribe(SafeEncoder.encode("bar.*")); + j.publish(SafeEncoder.encode("foo"), SafeEncoder + .encode("exit")); + j.publish(SafeEncoder.encode("bar"), SafeEncoder + .encode("exit")); + j.publish(SafeEncoder.encode("bar.123"), SafeEncoder + .encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(pubsub, SafeEncoder.encode("foo")); + t.join(); + } + @Test public void subscribeWithoutConnecting() { - try { - Jedis jedis = new Jedis(hnp.host, hnp.port); - jedis.subscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - } - public void onPMessage(String pattern, String channel, - String message) { - } - public void onSubscribe(String channel, int subscribedChannels) { - } - public void onUnsubscribe(String channel, int subscribedChannels) { - } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } - public void onPSubscribe(String pattern, int subscribedChannels) { - } - }, "foo"); - } catch(NullPointerException ex) { - fail(); - } catch(JedisException ex) { - // this is OK because we are not sending AUTH command - } + try { + Jedis jedis = new Jedis(hnp.host, hnp.port); + jedis.subscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + } + + public void onPMessage(String pattern, String channel, + String message) { + } + + public void onSubscribe(String channel, int subscribedChannels) { + } + + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + public void onPUnsubscribe(String pattern, + int subscribedChannels) { + } + + public void onPSubscribe(String pattern, int subscribedChannels) { + } + }, "foo"); + } catch (NullPointerException ex) { + fail(); + } catch (JedisException ex) { + // this is OK because we are not sending AUTH command + } } } \ No newline at end of file