From 58123034f5bf5ab51fce77980ab447f13bdac02e Mon Sep 17 00:00:00 2001 From: Jonathan Leibiusky Date: Tue, 3 Dec 2013 17:43:15 -0500 Subject: [PATCH] make pubsub test fast --- .../PublishSubscribeCommandsTest.java | 225 +++--------------- 1 file changed, 32 insertions(+), 193 deletions(-) diff --git a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java index de60084..3ea14d5 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java @@ -16,21 +16,22 @@ import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.util.SafeEncoder; public class PublishSubscribeCommandsTest extends JedisCommandTestBase { - @Test - public void subscribe() throws InterruptedException { + private void publishOne(final String channel, final String message) { Thread t = new Thread(new Runnable() { public void run() { try { Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo", "exit"); + j.publish(channel, message); j.disconnect(); } catch (Exception ex) { - fail(ex.getMessage()); } } }); t.start(); + } + + @Test + public void subscribe() throws InterruptedException { jedis.subscribe(new JedisPubSub() { public void onMessage(String channel, String message) { assertEquals("foo", channel); @@ -41,6 +42,9 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { public void onSubscribe(String channel, int subscribedChannels) { assertEquals("foo", channel); assertEquals(1, subscribedChannels); + + //now that I'm subscribed... publish + publishOne("foo", "exit"); } public void onUnsubscribe(String channel, int subscribedChannels) { @@ -58,33 +62,18 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { String message) { } }, "foo"); - t.join(); } @Test public void subscribeMany() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo", "exit"); - Thread.sleep(1000); - j.publish("bar", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(new JedisPubSub() { public void onMessage(String channel, String message) { unsubscribe(channel); } public void onSubscribe(String channel, int subscribedChannels) { + publishOne(channel, "exit"); } public void onUnsubscribe(String channel, int subscribedChannels) { @@ -100,25 +89,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { String message) { } }, "foo", "bar"); - t.join(); } @Test public void psubscribe() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo.bar", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.psubscribe(new JedisPubSub() { public void onMessage(String channel, String message) { } @@ -132,6 +107,8 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { public void onPSubscribe(String pattern, int subscribedChannels) { assertEquals("foo.*", pattern); assertEquals(1, subscribedChannels); + publishOne("foo.bar", "exit"); + } public void onPUnsubscribe(String pattern, int subscribedChannels) { @@ -147,27 +124,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { punsubscribe(); } }, "foo.*"); - t.join(); } @Test public void psubscribeMany() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo.123", "exit"); - Thread.sleep(1000); - j.publish("bar.123", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.psubscribe(new JedisPubSub() { public void onMessage(String channel, String message) { } @@ -179,6 +140,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } public void onPSubscribe(String pattern, int subscribedChannels) { + publishOne(pattern.replace("*", "123"), "exit"); } public void onPUnsubscribe(String pattern, int subscribedChannels) { @@ -189,7 +151,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { punsubscribe(pattern); } }, "foo.*", "bar.*"); - t.join(); } @Test @@ -201,12 +162,18 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } public void onSubscribe(String channel, int subscribedChannels) { + publishOne(channel, "exit"); + if (!channel.equals("bar")) { + this.subscribe("bar"); + this.psubscribe("bar.*"); + } } public void onUnsubscribe(String channel, int subscribedChannels) { } public void onPSubscribe(String pattern, int subscribedChannels) { + publishOne(pattern.replace("*", "123"), "exit"); } public void onPUnsubscribe(String pattern, int subscribedChannels) { @@ -218,44 +185,12 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } }; - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - pubsub.subscribe("bar"); - pubsub.psubscribe("bar.*"); - j.publish("foo", "exit"); - j.publish("bar", "exit"); - j.publish("bar.123", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(pubsub, "foo"); - t.join(); } @Test public void binarySubscribe() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(new BinaryJedisPubSub() { public void onMessage(byte[] channel, byte[] message) { assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); @@ -266,6 +201,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { public void onSubscribe(byte[] channel, int subscribedChannels) { assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); assertEquals(1, subscribedChannels); + publishOne(SafeEncoder.encode(channel), "exit"); } public void onUnsubscribe(byte[] channel, int subscribedChannels) { @@ -283,35 +219,18 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { byte[] message) { } }, SafeEncoder.encode("foo")); - t.join(); } @Test public void binarySubscribeMany() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo"), - SafeEncoder.encode("exit")); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("bar"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(new BinaryJedisPubSub() { public void onMessage(byte[] channel, byte[] message) { unsubscribe(channel); } public void onSubscribe(byte[] channel, int subscribedChannels) { + publishOne(SafeEncoder.encode(channel), "exit"); } public void onUnsubscribe(byte[] channel, int subscribedChannels) { @@ -327,26 +246,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { byte[] message) { } }, SafeEncoder.encode("foo"), SafeEncoder.encode("bar")); - t.join(); } @Test public void binaryPsubscribe() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo.bar"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.psubscribe(new BinaryJedisPubSub() { public void onMessage(byte[] channel, byte[] message) { } @@ -360,6 +264,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { public void onPSubscribe(byte[] pattern, int subscribedChannels) { assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); assertEquals(1, subscribedChannels); + publishOne(SafeEncoder.encode(pattern).replace("*", "bar"), "exit"); } public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { @@ -376,29 +281,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { punsubscribe(); } }, SafeEncoder.encode("foo.*")); - t.join(); } @Test public void binaryPsubscribeMany() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo.123"), - SafeEncoder.encode("exit")); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("bar.123"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.psubscribe(new BinaryJedisPubSub() { public void onMessage(byte[] channel, byte[] message) { } @@ -410,6 +297,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } public void onPSubscribe(byte[] pattern, int subscribedChannels) { + publishOne(SafeEncoder.encode(pattern).replace("*", "123"), "exit"); } public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { @@ -420,7 +308,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { punsubscribe(pattern); } }, SafeEncoder.encode("foo.*"), SafeEncoder.encode("bar.*")); - t.join(); } @Test @@ -432,12 +319,19 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } public void onSubscribe(byte[] channel, int subscribedChannels) { + publishOne(SafeEncoder.encode(channel), "exit"); + + if(!SafeEncoder.encode(channel).equals("bar")) { + this.subscribe(SafeEncoder.encode("bar")); + this.psubscribe(SafeEncoder.encode("bar.*")); + } } public void onUnsubscribe(byte[] channel, int subscribedChannels) { } public void onPSubscribe(byte[] pattern, int subscribedChannels) { + publishOne(SafeEncoder.encode(pattern).replace("*", "123"), "exit"); } public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { @@ -449,61 +343,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } }; - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - pubsub.subscribe(SafeEncoder.encode("bar")); - pubsub.psubscribe(SafeEncoder.encode("bar.*")); - j.publish(SafeEncoder.encode("foo"), - SafeEncoder.encode("exit")); - j.publish(SafeEncoder.encode("bar"), - SafeEncoder.encode("exit")); - j.publish(SafeEncoder.encode("bar.123"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(pubsub, SafeEncoder.encode("foo")); - t.join(); - } - - @Test - @Ignore - public void subscribeWithoutConnecting() { - try { - Jedis jedis = new Jedis(hnp.getHost(), hnp.getPort()); - jedis.subscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - } - - public void onPMessage(String pattern, String channel, - String message) { - } - - public void onSubscribe(String channel, int subscribedChannels) { - } - - public void onUnsubscribe(String channel, int subscribedChannels) { - } - - public void onPUnsubscribe(String pattern, - int subscribedChannels) { - } - - public void onPSubscribe(String pattern, int subscribedChannels) { - } - }, "foo"); - } catch (NullPointerException ex) { - fail(); - } catch (JedisDataException ex) { - // this is OK because we are not sending AUTH command - } } @Test(expected = JedisConnectionException.class) @@ -570,7 +410,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { // client-output-buffer exceed // System.out.println("channel - " + channel + // " / message - " + message); - Thread.sleep(500); + Thread.sleep(100); } catch (Exception e) { try { t.join(); @@ -598,13 +438,12 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { String message) { } }, "foo"); - t.join(); } finally { // exit the publisher thread. if exception is thrown, thread might // still keep publishing things. exit.set(true); if (t.isAlive()) { - t.wait(); + t.join(); } } }