From efbb710343b5ca63f679d08213e5265ffda5ee64 Mon Sep 17 00:00:00 2001 From: Jonathan Leibiusky Date: Tue, 3 Dec 2013 08:31:24 -0500 Subject: [PATCH] Change string size to make test fail faster --- .../PublishSubscribeCommandsTest.java | 950 +++++++++--------- 1 file changed, 484 insertions(+), 466 deletions(-) diff --git a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java index 37fb2a1..de60084 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java @@ -3,6 +3,7 @@ package redis.clients.jedis.tests.commands; import java.io.IOException; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Ignore; import org.junit.Test; @@ -17,585 +18,602 @@ import redis.clients.util.SafeEncoder; public class PublishSubscribeCommandsTest extends JedisCommandTestBase { @Test public void subscribe() throws InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.subscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - assertEquals("foo", channel); - assertEquals("exit", message); - unsubscribe(); - } + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish("foo", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + assertEquals("foo", channel); + assertEquals("exit", message); + unsubscribe(); + } - public void onSubscribe(String channel, int subscribedChannels) { - assertEquals("foo", channel); - assertEquals(1, subscribedChannels); - } + public void onSubscribe(String channel, int subscribedChannels) { + assertEquals("foo", channel); + assertEquals(1, subscribedChannels); + } - public void onUnsubscribe(String channel, int subscribedChannels) { - assertEquals("foo", channel); - assertEquals(0, subscribedChannels); - } + public void onUnsubscribe(String channel, int subscribedChannels) { + assertEquals("foo", channel); + assertEquals(0, subscribedChannels); + } - public void onPSubscribe(String pattern, int subscribedChannels) { - } + public void onPSubscribe(String pattern, int subscribedChannels) { + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } - public void onPMessage(String pattern, String channel, - String message) { - } - }, "foo"); - t.join(); + public void onPMessage(String pattern, String channel, + String message) { + } + }, "foo"); + t.join(); } @Test public void subscribeMany() throws UnknownHostException, IOException, - InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo", "exit"); - Thread.sleep(1000); - j.publish("bar", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.subscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - unsubscribe(channel); - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish("foo", "exit"); + Thread.sleep(1000); + j.publish("bar", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + unsubscribe(channel); + } - public void onSubscribe(String channel, int subscribedChannels) { - } + public void onSubscribe(String channel, int subscribedChannels) { + } - public void onUnsubscribe(String channel, int subscribedChannels) { - } + public void onUnsubscribe(String channel, int subscribedChannels) { + } - public void onPSubscribe(String pattern, int subscribedChannels) { - } + public void onPSubscribe(String pattern, int subscribedChannels) { + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } - public void onPMessage(String pattern, String channel, - String message) { - } - }, "foo", "bar"); - t.join(); + public void onPMessage(String pattern, String channel, + String message) { + } + }, "foo", "bar"); + t.join(); } @Test public void psubscribe() throws UnknownHostException, IOException, - InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo.bar", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.psubscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish("foo.bar", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.psubscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + } - public void onSubscribe(String channel, int subscribedChannels) { - } + public void onSubscribe(String channel, int subscribedChannels) { + } - public void onUnsubscribe(String channel, int subscribedChannels) { - } + public void onUnsubscribe(String channel, int subscribedChannels) { + } - public void onPSubscribe(String pattern, int subscribedChannels) { - assertEquals("foo.*", pattern); - assertEquals(1, subscribedChannels); - } + public void onPSubscribe(String pattern, int subscribedChannels) { + assertEquals("foo.*", pattern); + assertEquals(1, subscribedChannels); + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - assertEquals("foo.*", pattern); - assertEquals(0, subscribedChannels); - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + assertEquals("foo.*", pattern); + assertEquals(0, subscribedChannels); + } - public void onPMessage(String pattern, String channel, - String message) { - assertEquals("foo.*", pattern); - assertEquals("foo.bar", channel); - assertEquals("exit", message); - punsubscribe(); - } - }, "foo.*"); - t.join(); + public void onPMessage(String pattern, String channel, + String message) { + assertEquals("foo.*", pattern); + assertEquals("foo.bar", channel); + assertEquals("exit", message); + punsubscribe(); + } + }, "foo.*"); + t.join(); } @Test public void psubscribeMany() throws UnknownHostException, IOException, - InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo.123", "exit"); - Thread.sleep(1000); - j.publish("bar.123", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.psubscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish("foo.123", "exit"); + Thread.sleep(1000); + j.publish("bar.123", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.psubscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + } - public void onSubscribe(String channel, int subscribedChannels) { - } + public void onSubscribe(String channel, int subscribedChannels) { + } - public void onUnsubscribe(String channel, int subscribedChannels) { - } + public void onUnsubscribe(String channel, int subscribedChannels) { + } - public void onPSubscribe(String pattern, int subscribedChannels) { - } + public void onPSubscribe(String pattern, int subscribedChannels) { + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } - public void onPMessage(String pattern, String channel, - String message) { - punsubscribe(pattern); - } - }, "foo.*", "bar.*"); - t.join(); + public void onPMessage(String pattern, String channel, + String message) { + punsubscribe(pattern); + } + }, "foo.*", "bar.*"); + t.join(); } @Test public void subscribeLazily() throws UnknownHostException, IOException, - InterruptedException { - final JedisPubSub pubsub = new JedisPubSub() { - public void onMessage(String channel, String message) { - unsubscribe(channel); - } + InterruptedException { + final JedisPubSub pubsub = new JedisPubSub() { + public void onMessage(String channel, String message) { + unsubscribe(channel); + } - public void onSubscribe(String channel, int subscribedChannels) { - } + public void onSubscribe(String channel, int subscribedChannels) { + } - public void onUnsubscribe(String channel, int subscribedChannels) { - } + public void onUnsubscribe(String channel, int subscribedChannels) { + } - public void onPSubscribe(String pattern, int subscribedChannels) { - } + public void onPSubscribe(String pattern, int subscribedChannels) { + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } - public void onPMessage(String pattern, String channel, - String message) { - punsubscribe(pattern); - } - }; + public void onPMessage(String pattern, String channel, + String message) { + punsubscribe(pattern); + } + }; - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - pubsub.subscribe("bar"); - pubsub.psubscribe("bar.*"); - j.publish("foo", "exit"); - j.publish("bar", "exit"); - j.publish("bar.123", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.subscribe(pubsub, "foo"); - t.join(); + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + pubsub.subscribe("bar"); + pubsub.psubscribe("bar.*"); + j.publish("foo", "exit"); + j.publish("bar", "exit"); + j.publish("bar.123", "exit"); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(pubsub, "foo"); + t.join(); } @Test public void binarySubscribe() throws UnknownHostException, IOException, - InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo"), SafeEncoder - .encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.subscribe(new BinaryJedisPubSub() { - public void onMessage(byte[] channel, byte[] message) { - assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); - assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message)); - unsubscribe(); - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("foo"), + SafeEncoder.encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); + assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message)); + unsubscribe(); + } - public void onSubscribe(byte[] channel, int subscribedChannels) { - assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); - assertEquals(1, subscribedChannels); - } + public void onSubscribe(byte[] channel, int subscribedChannels) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); + assertEquals(1, subscribedChannels); + } - public void onUnsubscribe(byte[] channel, int subscribedChannels) { - assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); - assertEquals(0, subscribedChannels); - } + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); + assertEquals(0, subscribedChannels); + } - public void onPSubscribe(byte[] pattern, int subscribedChannels) { - } + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + } - public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { - } + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + } - public void onPMessage(byte[] pattern, byte[] channel, - byte[] message) { - } - }, SafeEncoder.encode("foo")); - t.join(); + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + } + }, SafeEncoder.encode("foo")); + t.join(); } @Test public void binarySubscribeMany() throws UnknownHostException, IOException, - InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo"), SafeEncoder - .encode("exit")); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("bar"), SafeEncoder - .encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.subscribe(new BinaryJedisPubSub() { - public void onMessage(byte[] channel, byte[] message) { - unsubscribe(channel); - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("foo"), + SafeEncoder.encode("exit")); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("bar"), + SafeEncoder.encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + unsubscribe(channel); + } - public void onSubscribe(byte[] channel, int subscribedChannels) { - } + public void onSubscribe(byte[] channel, int subscribedChannels) { + } - public void onUnsubscribe(byte[] channel, int subscribedChannels) { - } + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + } - public void onPSubscribe(byte[] pattern, int subscribedChannels) { - } + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + } - public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { - } + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + } - public void onPMessage(byte[] pattern, byte[] channel, - byte[] message) { - } - }, SafeEncoder.encode("foo"), SafeEncoder.encode("bar")); - t.join(); + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + } + }, SafeEncoder.encode("foo"), SafeEncoder.encode("bar")); + t.join(); } @Test public void binaryPsubscribe() throws UnknownHostException, IOException, - InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo.bar"), SafeEncoder - .encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.psubscribe(new BinaryJedisPubSub() { - public void onMessage(byte[] channel, byte[] message) { - } + InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("foo.bar"), + SafeEncoder.encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.psubscribe(new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + } - public void onSubscribe(byte[] channel, int subscribedChannels) { - } + public void onSubscribe(byte[] channel, int subscribedChannels) { + } - public void onUnsubscribe(byte[] channel, int subscribedChannels) { - } + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + } - public void onPSubscribe(byte[] pattern, int subscribedChannels) { - assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); - assertEquals(1, subscribedChannels); - } + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); + assertEquals(1, subscribedChannels); + } - public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { - assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); - assertEquals(0, subscribedChannels); - } + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); + assertEquals(0, subscribedChannels); + } - public void onPMessage(byte[] pattern, byte[] channel, - byte[] message) { - assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); - assertTrue(Arrays - .equals(SafeEncoder.encode("foo.bar"), channel)); - assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message)); - punsubscribe(); - } - }, SafeEncoder.encode("foo.*")); - t.join(); + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); + assertTrue(Arrays + .equals(SafeEncoder.encode("foo.bar"), channel)); + assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message)); + punsubscribe(); + } + }, SafeEncoder.encode("foo.*")); + t.join(); } @Test public void binaryPsubscribeMany() throws UnknownHostException, - IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo.123"), SafeEncoder - .encode("exit")); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("bar.123"), SafeEncoder - .encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.psubscribe(new BinaryJedisPubSub() { - public void onMessage(byte[] channel, byte[] message) { - } + IOException, InterruptedException { + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("foo.123"), + SafeEncoder.encode("exit")); + Thread.sleep(1000); + j.publish(SafeEncoder.encode("bar.123"), + SafeEncoder.encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.psubscribe(new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + } - public void onSubscribe(byte[] channel, int subscribedChannels) { - } + public void onSubscribe(byte[] channel, int subscribedChannels) { + } - public void onUnsubscribe(byte[] channel, int subscribedChannels) { - } + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + } - public void onPSubscribe(byte[] pattern, int subscribedChannels) { - } + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + } - public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { - } + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + } - public void onPMessage(byte[] pattern, byte[] channel, - byte[] message) { - punsubscribe(pattern); - } - }, SafeEncoder.encode("foo.*"), SafeEncoder.encode("bar.*")); - t.join(); + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + punsubscribe(pattern); + } + }, SafeEncoder.encode("foo.*"), SafeEncoder.encode("bar.*")); + t.join(); } @Test public void binarySubscribeLazily() throws UnknownHostException, - IOException, InterruptedException { - final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() { - public void onMessage(byte[] channel, byte[] message) { - unsubscribe(channel); - } + IOException, InterruptedException { + final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() { + public void onMessage(byte[] channel, byte[] message) { + unsubscribe(channel); + } - public void onSubscribe(byte[] channel, int subscribedChannels) { - } + public void onSubscribe(byte[] channel, int subscribedChannels) { + } - public void onUnsubscribe(byte[] channel, int subscribedChannels) { - } + public void onUnsubscribe(byte[] channel, int subscribedChannels) { + } - public void onPSubscribe(byte[] pattern, int subscribedChannels) { - } + public void onPSubscribe(byte[] pattern, int subscribedChannels) { + } - public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { - } + public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { + } - public void onPMessage(byte[] pattern, byte[] channel, - byte[] message) { - punsubscribe(pattern); - } - }; + public void onPMessage(byte[] pattern, byte[] channel, + byte[] message) { + punsubscribe(pattern); + } + }; - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - pubsub.subscribe(SafeEncoder.encode("bar")); - pubsub.psubscribe(SafeEncoder.encode("bar.*")); - j.publish(SafeEncoder.encode("foo"), SafeEncoder - .encode("exit")); - j.publish(SafeEncoder.encode("bar"), SafeEncoder - .encode("exit")); - j.publish(SafeEncoder.encode("bar.123"), SafeEncoder - .encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.subscribe(pubsub, SafeEncoder.encode("foo")); - t.join(); + Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + Thread.sleep(1000); + pubsub.subscribe(SafeEncoder.encode("bar")); + pubsub.psubscribe(SafeEncoder.encode("bar.*")); + j.publish(SafeEncoder.encode("foo"), + SafeEncoder.encode("exit")); + j.publish(SafeEncoder.encode("bar"), + SafeEncoder.encode("exit")); + j.publish(SafeEncoder.encode("bar.123"), + SafeEncoder.encode("exit")); + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(pubsub, SafeEncoder.encode("foo")); + t.join(); } - @Test @Ignore + @Test + @Ignore public void subscribeWithoutConnecting() { - try { - Jedis jedis = new Jedis(hnp.getHost(), hnp.getPort()); - jedis.subscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - } + try { + Jedis jedis = new Jedis(hnp.getHost(), hnp.getPort()); + jedis.subscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + } - public void onPMessage(String pattern, String channel, - String message) { - } + public void onPMessage(String pattern, String channel, + String message) { + } - public void onSubscribe(String channel, int subscribedChannels) { - } + public void onSubscribe(String channel, int subscribedChannels) { + } - public void onUnsubscribe(String channel, int subscribedChannels) { - } + public void onUnsubscribe(String channel, int subscribedChannels) { + } - public void onPUnsubscribe(String pattern, - int subscribedChannels) { - } + public void onPUnsubscribe(String pattern, + int subscribedChannels) { + } - public void onPSubscribe(String pattern, int subscribedChannels) { - } - }, "foo"); - } catch (NullPointerException ex) { - fail(); - } catch (JedisDataException ex) { - // this is OK because we are not sending AUTH command - } + public void onPSubscribe(String pattern, int subscribedChannels) { + } + }, "foo"); + } catch (NullPointerException ex) { + fail(); + } catch (JedisDataException ex) { + // this is OK because we are not sending AUTH command + } } @Test(expected = JedisConnectionException.class) public void unsubscribeWhenNotSusbscribed() throws InterruptedException { - JedisPubSub pubsub = new JedisPubSub() { - public void onMessage(String channel, String message) { - } + JedisPubSub pubsub = new JedisPubSub() { + public void onMessage(String channel, String message) { + } - public void onPMessage(String pattern, String channel, - String message) { - } + public void onPMessage(String pattern, String channel, + String message) { + } - public void onSubscribe(String channel, int subscribedChannels) { - } + public void onSubscribe(String channel, int subscribedChannels) { + } - public void onUnsubscribe(String channel, int subscribedChannels) { - } + public void onUnsubscribe(String channel, int subscribedChannels) { + } - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } - public void onPSubscribe(String pattern, int subscribedChannels) { - } - }; - pubsub.unsubscribe(); + public void onPSubscribe(String pattern, int subscribedChannels) { + } + }; + pubsub.unsubscribe(); } - + @Test(expected = JedisConnectionException.class) - public void handleClientOutputBufferLimitForSubscribeTooSlow() throws InterruptedException { - final Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - - Thread.sleep(1000); - - // we already set jedis1 config to client-output-buffer-limit pubsub 256k 128k 5 - // it means if subscriber delayed to receive over 256k or 128k continuously 5 sec, - // redis disconnects subscriber - - // we publish over 10M data for making situation for exceed client-output-buffer-limit - String veryLargeString = makeLargeString(1024 * 10); - - // 10K * 1024 = 10M - for (int i = 0 ; i < 1024 ; i++) - j.publish("foo", veryLargeString); // 1k - - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); - jedis.subscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - try { - // wait 0.5 secs to slow down subscribe and client-output-buffer exceed - //System.out.println("channel - " + channel + " / message - " + message); - Thread.sleep(500); - } catch (Exception e) { - try { - t.join(); - } catch (InterruptedException e1) { - } - - fail(e.getMessage()); - } - } + public void handleClientOutputBufferLimitForSubscribeTooSlow() + throws InterruptedException { + final Jedis j = createJedis(); + final AtomicBoolean exit = new AtomicBoolean(false); - public void onSubscribe(String channel, int subscribedChannels) { - } + final Thread t = new Thread(new Runnable() { + public void run() { + try { - public void onUnsubscribe(String channel, int subscribedChannels) { - } + // we already set jedis1 config to + // client-output-buffer-limit pubsub 256k 128k 5 + // it means if subscriber delayed to receive over 256k or + // 128k continuously 5 sec, + // redis disconnects subscriber - public void onPSubscribe(String pattern, int subscribedChannels) { - } + // we publish over 100M data for making situation for exceed + // client-output-buffer-limit + String veryLargeString = makeLargeString(10485760); - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } + // 10M * 10 = 100M + for (int i = 0; i < 10 && !exit.get(); i++) { + j.publish("foo", veryLargeString); + } - public void onPMessage(String pattern, String channel, - String message) { - } - }, "foo"); - t.join(); + j.disconnect(); + } catch (Exception ex) { + } + } + }); + t.start(); + try { + jedis.subscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + try { + // wait 0.5 secs to slow down subscribe and + // client-output-buffer exceed + // System.out.println("channel - " + channel + + // " / message - " + message); + Thread.sleep(500); + } catch (Exception e) { + try { + t.join(); + } catch (InterruptedException e1) { + } + + fail(e.getMessage()); + } + } + + public void onSubscribe(String channel, int subscribedChannels) { + } + + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + public void onPSubscribe(String pattern, int subscribedChannels) { + } + + public void onPUnsubscribe(String pattern, + int subscribedChannels) { + } + + public void onPMessage(String pattern, String channel, + String message) { + } + }, "foo"); + t.join(); + } finally { + // exit the publisher thread. if exception is thrown, thread might + // still keep publishing things. + exit.set(true); + if (t.isAlive()) { + t.wait(); + } + } } - - private String makeLargeString(int size) { - StringBuffer sb = new StringBuffer(); - for (int i = 0 ; i < size ; i++) - sb.append((char)('a' + i % 26)); - return sb.toString(); + private String makeLargeString(int size) { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < size; i++) + sb.append((char) ('a' + i % 26)); + + return sb.toString(); } }