diff --git a/Makefile b/Makefile index 6df68af..a6e9912 100644 --- a/Makefile +++ b/Makefile @@ -38,6 +38,7 @@ pidfile /tmp/redis4.pid logfile /tmp/redis4.log save "" appendonly no +slaveof localhost 6381 endef define REDIS5_CONF @@ -49,6 +50,7 @@ pidfile /tmp/redis5.pid logfile /tmp/redis5.log save "" appendonly no +slaveof localhost 6381 endef define REDIS6_CONF @@ -60,6 +62,7 @@ pidfile /tmp/redis6.pid logfile /tmp/redis6.log save "" appendonly no +slaveof localhost 6379 endef define REDIS_SENTINEL1 diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index 385b1c0..597e305 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -2860,6 +2860,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKey */ public void monitor(final JedisMonitor jedisMonitor) { client.monitor(); + client.getStatusCodeReply(); jedisMonitor.proceed(client); } diff --git a/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java b/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java index 97a02ed..c3a72a0 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java @@ -1,9 +1,8 @@ package redis.clients.jedis.tests; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.junit.Before; @@ -12,8 +11,8 @@ import org.junit.Test; import redis.clients.jedis.DebugParams; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPubSub; import redis.clients.jedis.JedisSentinelPool; -import redis.clients.jedis.tests.utils.JedisSentinelTestUtil; public class JedisSentinelPoolTest extends JedisTestBase { private static final String MASTER_NAME = "mymaster"; @@ -29,43 +28,16 @@ public class JedisSentinelPoolTest extends JedisTestBase { protected static HostAndPort sentinel2 = HostAndPortUtil .getSentinelServers().get(2); - protected static Jedis masterJedis; - protected static Jedis slaveJedis1; - protected static Jedis slaveJedis2; protected static Jedis sentinelJedis1; - protected static Jedis sentinelJedis2; protected Set sentinels = new HashSet(); @Before public void setUp() throws Exception { - - // set up master and slaves - masterJedis = new Jedis(master.getHost(), master.getPort()); - masterJedis.auth("foobared"); - masterJedis.slaveofNoOne(); - - slaveJedis1 = new Jedis(slave1.getHost(), slave1.getPort()); - slaveJedis1.auth("foobared"); - slaveJedis1.slaveof(master.getHost(), master.getPort()); - - slaveJedis2 = new Jedis(slave2.getHost(), slave2.getPort()); - slaveJedis2.auth("foobared"); - slaveJedis2.slaveof(master.getHost(), master.getPort()); - sentinels.add(sentinel1.toString()); sentinels.add(sentinel2.toString()); - List slaves = new ArrayList(); - slaves.add(slave1); - slaves.add(slave2); - - JedisSentinelTestUtil.waitForSentinelRecognizeRedisReplication( - sentinel1, MASTER_NAME, master, slaves); - JedisSentinelTestUtil.waitForSentinelRecognizeRedisReplication( - sentinel2, MASTER_NAME, master, slaves); - - // No need to wait for sentinels to recognize each other + sentinelJedis1 = new Jedis(sentinel1.getHost(), sentinel1.getPort()); } @Test @@ -106,17 +78,62 @@ public class JedisSentinelPoolTest extends JedisTestBase { private void waitForFailover(JedisSentinelPool pool, HostAndPort oldMaster) throws InterruptedException { - HostAndPort newMaster = JedisSentinelTestUtil.waitForNewPromotedMaster( - sentinel1, MASTER_NAME, oldMaster); - JedisSentinelTestUtil.waitForNewPromotedMaster(sentinel2, MASTER_NAME, - oldMaster); - JedisSentinelTestUtil.waitForSentinelsRecognizeEachOthers(); - waitForJedisSentinelPoolRecognizeNewMaster(pool, newMaster); + waitForJedisSentinelPoolRecognizeNewMaster(pool); } private void waitForJedisSentinelPoolRecognizeNewMaster( - JedisSentinelPool pool, HostAndPort newMaster) - throws InterruptedException { + JedisSentinelPool pool) throws InterruptedException { + + final AtomicReference newmaster = new AtomicReference( + ""); + + sentinelJedis1.psubscribe(new JedisPubSub() { + + @Override + public void onMessage(String channel, String message) { + // TODO Auto-generated method stub + + } + + @Override + public void onPMessage(String pattern, String channel, + String message) { + if (channel.equals("+switch-master")) { + newmaster.set(message); + punsubscribe(); + } + // TODO Auto-generated method stub + + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + // TODO Auto-generated method stub + + } + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + // TODO Auto-generated method stub + + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + // TODO Auto-generated method stub + + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + // TODO Auto-generated method stub + + } + }, "*"); + + String[] chunks = newmaster.get().split(" "); + HostAndPort newMaster = new HostAndPort(chunks[3], + Integer.parseInt(chunks[4])); while (true) { String host = pool.getCurrentHostMaster().getHost(); @@ -128,7 +145,7 @@ public class JedisSentinelPoolTest extends JedisTestBase { System.out .println("JedisSentinelPool's master is not yet changed, sleep..."); - Thread.sleep(1000); + Thread.sleep(100); } } diff --git a/src/test/java/redis/clients/jedis/tests/JedisSentinelTest.java b/src/test/java/redis/clients/jedis/tests/JedisSentinelTest.java index 8912a1c..f8267c8 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisSentinelTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisSentinelTest.java @@ -1,6 +1,5 @@ package redis.clients.jedis.tests; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -10,15 +9,14 @@ import org.junit.Test; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; -import redis.clients.jedis.tests.utils.JedisSentinelTestUtil; public class JedisSentinelTest extends JedisTestBase { private static final String MASTER_NAME = "mymaster"; protected static HostAndPort master = HostAndPortUtil.getRedisServers() .get(0); - protected static HostAndPort slave = HostAndPortUtil.getRedisServers() - .get(5); + protected static HostAndPort slave = HostAndPortUtil.getRedisServers().get( + 5); protected static HostAndPort sentinel = HostAndPortUtil .getSentinelServers().get(0); @@ -28,25 +26,16 @@ public class JedisSentinelTest extends JedisTestBase { @Before public void setup() throws InterruptedException { - masterJedis = new Jedis(master.getHost(), master.getPort()); - slaveJedis = new Jedis(slave.getHost(), slave.getPort()); - slaveJedis.auth("foobared"); - slaveJedis.configSet("masterauth", "foobared"); - slaveJedis.slaveof(master.getHost(), master.getPort()); - - List slaves = new ArrayList(); - slaves.add(slave); - - JedisSentinelTestUtil.waitForSentinelRecognizeRedisReplication( - sentinel, MASTER_NAME, master, slaves); } @After public void clear() throws InterruptedException { - // New Sentinel (after 2.8.1) - // when slave promoted to master (slave of no one), New Sentinel force to restore it (demote) - // so, promote(slaveof) slave to master has no effect, not same to old Sentinel's behavior + // New Sentinel (after 2.8.1) + // when slave promoted to master (slave of no one), New Sentinel force + // to restore it (demote) + // so, promote(slaveof) slave to master has no effect, not same to old + // Sentinel's behavior } @Test @@ -59,13 +48,15 @@ public class JedisSentinelTest extends JedisTestBase { List masterHostAndPort = j .sentinelGetMasterAddrByName(masterName); - HostAndPort masterFromSentinel = new HostAndPort(masterHostAndPort.get(0), - Integer.parseInt(masterHostAndPort.get(1))); + HostAndPort masterFromSentinel = new HostAndPort( + masterHostAndPort.get(0), Integer.parseInt(masterHostAndPort + .get(1))); assertEquals(master, masterFromSentinel); List> slaves = j.sentinelSlaves(masterName); assertTrue(slaves.size() > 0); - assertEquals(master.getPort(), Integer.parseInt(slaves.get(0).get("master-port"))); + assertEquals(master.getPort(), + Integer.parseInt(slaves.get(0).get("master-port"))); // DO NOT RE-RUN TEST TOO FAST, RESET TAKES SOME TIME TO... RESET assertEquals(Long.valueOf(1), j.sentinelReset(masterName)); diff --git a/src/test/java/redis/clients/jedis/tests/JedisTest.java b/src/test/java/redis/clients/jedis/tests/JedisTest.java index bfa71ea..b123b05 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisTest.java @@ -52,9 +52,7 @@ public class JedisTest extends JedisCommandTestBase { jedis = new Jedis("localhost", 6379, 15000); jedis.auth("foobared"); jedis.configSet("timeout", "1"); - // we need to sleep a long time since redis check for idle connections - // every 10 seconds or so - Thread.sleep(20000); + Thread.sleep(2000); jedis.hmget("foobar", "foo"); } diff --git a/src/test/java/redis/clients/jedis/tests/commands/ControlCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/ControlCommandsTest.java index 4ae64df..00612bb 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/ControlCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/ControlCommandsTest.java @@ -45,18 +45,8 @@ public class ControlCommandsTest extends JedisCommandTestBase { @Test public void lastsave() throws InterruptedException { - long before = jedis.lastsave(); - String st = ""; - while (!st.equals("OK")) { - try { - Thread.sleep(1000); - st = jedis.save(); - } catch (JedisDataException e) { - - } - } - long after = jedis.lastsave(); - assertTrue((after - before) > 0); + long saved = jedis.lastsave(); + assertTrue(saved > 0); } @Test @@ -73,14 +63,9 @@ public class ControlCommandsTest extends JedisCommandTestBase { public void run() { Jedis j = new Jedis("localhost"); j.auth("foobared"); - for (int i = 0; i < 4; i++) { + for (int i = 0; i < 5; i++) { j.incr("foobared"); } - try { - Thread.sleep(2500); - } catch (InterruptedException e) { - } - j.incr("foobared"); j.disconnect(); } }).start(); diff --git a/src/test/java/redis/clients/jedis/tests/commands/ListCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/ListCommandsTest.java index 5653687..296877b 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/ListCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/ListCommandsTest.java @@ -421,41 +421,18 @@ public class ListCommandsTest extends JedisCommandTestBase { List result = jedis.blpop(1, "foo"); assertNull(result); - new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - j.lpush("foo", "bar"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }).start(); - + jedis.lpush("foo", "bar"); result = jedis.blpop(1, "foo"); + assertNotNull(result); assertEquals(2, result.size()); assertEquals("foo", result.get(0)); assertEquals("bar", result.get(1)); // Binary + jedis.lpush(bfoo, bbar); List bresult = jedis.blpop(1, bfoo); - assertNull(bresult); - new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - j.lpush(bfoo, bbar); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }).start(); - - bresult = jedis.blpop(1, bfoo); assertNotNull(bresult); assertEquals(2, bresult.size()); assertArrayEquals(bfoo, bresult.get(0)); @@ -468,18 +445,8 @@ public class ListCommandsTest extends JedisCommandTestBase { List result = jedis.brpop(1, "foo"); assertNull(result); - new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - j.lpush("foo", "bar"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }).start(); + jedis.lpush("foo", "bar"); result = jedis.brpop(1, "foo"); assertNotNull(result); assertEquals(2, result.size()); @@ -487,22 +454,9 @@ public class ListCommandsTest extends JedisCommandTestBase { assertEquals("bar", result.get(1)); // Binary + + jedis.lpush(bfoo, bbar); List bresult = jedis.brpop(1, bfoo); - assertNull(bresult); - - new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - j.lpush(bfoo, bbar); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }).start(); - - bresult = jedis.brpop(1, bfoo); assertNotNull(bresult); assertEquals(2, bresult.size()); assertArrayEquals(bfoo, bresult.get(0)); @@ -594,7 +548,7 @@ public class ListCommandsTest extends JedisCommandTestBase { (new Thread(new Runnable() { public void run() { try { - Thread.sleep(2000); + Thread.sleep(100); Jedis j = createJedis(); j.lpush("foo", "a"); } catch (InterruptedException e) { diff --git a/src/test/java/redis/clients/jedis/tests/commands/ObjectCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/ObjectCommandsTest.java index 0713753..1da4331 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/ObjectCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/ObjectCommandsTest.java @@ -36,14 +36,11 @@ public class ObjectCommandsTest extends JedisCommandTestBase { public void objectIdletime() throws InterruptedException { jedis.lpush(key, "hello world"); - // Wait a little bit more than 10 seconds so the idle time is 10 - // seconds. - Thread.sleep(10001); Long time = jedis.objectIdletime(key); - assertEquals(new Long(10), time); + assertEquals(new Long(0), time); // Binary time = jedis.objectIdletime(binaryKey); - assertEquals(new Long(10), time); + assertEquals(new Long(0), time); } } \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java index de60084..3ea14d5 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java @@ -16,21 +16,22 @@ import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.util.SafeEncoder; public class PublishSubscribeCommandsTest extends JedisCommandTestBase { - @Test - public void subscribe() throws InterruptedException { + private void publishOne(final String channel, final String message) { Thread t = new Thread(new Runnable() { public void run() { try { Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo", "exit"); + j.publish(channel, message); j.disconnect(); } catch (Exception ex) { - fail(ex.getMessage()); } } }); t.start(); + } + + @Test + public void subscribe() throws InterruptedException { jedis.subscribe(new JedisPubSub() { public void onMessage(String channel, String message) { assertEquals("foo", channel); @@ -41,6 +42,9 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { public void onSubscribe(String channel, int subscribedChannels) { assertEquals("foo", channel); assertEquals(1, subscribedChannels); + + //now that I'm subscribed... publish + publishOne("foo", "exit"); } public void onUnsubscribe(String channel, int subscribedChannels) { @@ -58,33 +62,18 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { String message) { } }, "foo"); - t.join(); } @Test public void subscribeMany() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo", "exit"); - Thread.sleep(1000); - j.publish("bar", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(new JedisPubSub() { public void onMessage(String channel, String message) { unsubscribe(channel); } public void onSubscribe(String channel, int subscribedChannels) { + publishOne(channel, "exit"); } public void onUnsubscribe(String channel, int subscribedChannels) { @@ -100,25 +89,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { String message) { } }, "foo", "bar"); - t.join(); } @Test public void psubscribe() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo.bar", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.psubscribe(new JedisPubSub() { public void onMessage(String channel, String message) { } @@ -132,6 +107,8 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { public void onPSubscribe(String pattern, int subscribedChannels) { assertEquals("foo.*", pattern); assertEquals(1, subscribedChannels); + publishOne("foo.bar", "exit"); + } public void onPUnsubscribe(String pattern, int subscribedChannels) { @@ -147,27 +124,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { punsubscribe(); } }, "foo.*"); - t.join(); } @Test public void psubscribeMany() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish("foo.123", "exit"); - Thread.sleep(1000); - j.publish("bar.123", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.psubscribe(new JedisPubSub() { public void onMessage(String channel, String message) { } @@ -179,6 +140,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } public void onPSubscribe(String pattern, int subscribedChannels) { + publishOne(pattern.replace("*", "123"), "exit"); } public void onPUnsubscribe(String pattern, int subscribedChannels) { @@ -189,7 +151,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { punsubscribe(pattern); } }, "foo.*", "bar.*"); - t.join(); } @Test @@ -201,12 +162,18 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } public void onSubscribe(String channel, int subscribedChannels) { + publishOne(channel, "exit"); + if (!channel.equals("bar")) { + this.subscribe("bar"); + this.psubscribe("bar.*"); + } } public void onUnsubscribe(String channel, int subscribedChannels) { } public void onPSubscribe(String pattern, int subscribedChannels) { + publishOne(pattern.replace("*", "123"), "exit"); } public void onPUnsubscribe(String pattern, int subscribedChannels) { @@ -218,44 +185,12 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } }; - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - pubsub.subscribe("bar"); - pubsub.psubscribe("bar.*"); - j.publish("foo", "exit"); - j.publish("bar", "exit"); - j.publish("bar.123", "exit"); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(pubsub, "foo"); - t.join(); } @Test public void binarySubscribe() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(new BinaryJedisPubSub() { public void onMessage(byte[] channel, byte[] message) { assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); @@ -266,6 +201,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { public void onSubscribe(byte[] channel, int subscribedChannels) { assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); assertEquals(1, subscribedChannels); + publishOne(SafeEncoder.encode(channel), "exit"); } public void onUnsubscribe(byte[] channel, int subscribedChannels) { @@ -283,35 +219,18 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { byte[] message) { } }, SafeEncoder.encode("foo")); - t.join(); } @Test public void binarySubscribeMany() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo"), - SafeEncoder.encode("exit")); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("bar"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(new BinaryJedisPubSub() { public void onMessage(byte[] channel, byte[] message) { unsubscribe(channel); } public void onSubscribe(byte[] channel, int subscribedChannels) { + publishOne(SafeEncoder.encode(channel), "exit"); } public void onUnsubscribe(byte[] channel, int subscribedChannels) { @@ -327,26 +246,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { byte[] message) { } }, SafeEncoder.encode("foo"), SafeEncoder.encode("bar")); - t.join(); } @Test public void binaryPsubscribe() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo.bar"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.psubscribe(new BinaryJedisPubSub() { public void onMessage(byte[] channel, byte[] message) { } @@ -360,6 +264,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { public void onPSubscribe(byte[] pattern, int subscribedChannels) { assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); assertEquals(1, subscribedChannels); + publishOne(SafeEncoder.encode(pattern).replace("*", "bar"), "exit"); } public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { @@ -376,29 +281,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { punsubscribe(); } }, SafeEncoder.encode("foo.*")); - t.join(); } @Test public void binaryPsubscribeMany() throws UnknownHostException, IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("foo.123"), - SafeEncoder.encode("exit")); - Thread.sleep(1000); - j.publish(SafeEncoder.encode("bar.123"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.psubscribe(new BinaryJedisPubSub() { public void onMessage(byte[] channel, byte[] message) { } @@ -410,6 +297,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } public void onPSubscribe(byte[] pattern, int subscribedChannels) { + publishOne(SafeEncoder.encode(pattern).replace("*", "123"), "exit"); } public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { @@ -420,7 +308,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { punsubscribe(pattern); } }, SafeEncoder.encode("foo.*"), SafeEncoder.encode("bar.*")); - t.join(); } @Test @@ -432,12 +319,19 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } public void onSubscribe(byte[] channel, int subscribedChannels) { + publishOne(SafeEncoder.encode(channel), "exit"); + + if(!SafeEncoder.encode(channel).equals("bar")) { + this.subscribe(SafeEncoder.encode("bar")); + this.psubscribe(SafeEncoder.encode("bar.*")); + } } public void onUnsubscribe(byte[] channel, int subscribedChannels) { } public void onPSubscribe(byte[] pattern, int subscribedChannels) { + publishOne(SafeEncoder.encode(pattern).replace("*", "123"), "exit"); } public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { @@ -449,61 +343,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { } }; - Thread t = new Thread(new Runnable() { - public void run() { - try { - Jedis j = createJedis(); - Thread.sleep(1000); - pubsub.subscribe(SafeEncoder.encode("bar")); - pubsub.psubscribe(SafeEncoder.encode("bar.*")); - j.publish(SafeEncoder.encode("foo"), - SafeEncoder.encode("exit")); - j.publish(SafeEncoder.encode("bar"), - SafeEncoder.encode("exit")); - j.publish(SafeEncoder.encode("bar.123"), - SafeEncoder.encode("exit")); - j.disconnect(); - } catch (Exception ex) { - fail(ex.getMessage()); - } - } - }); - t.start(); jedis.subscribe(pubsub, SafeEncoder.encode("foo")); - t.join(); - } - - @Test - @Ignore - public void subscribeWithoutConnecting() { - try { - Jedis jedis = new Jedis(hnp.getHost(), hnp.getPort()); - jedis.subscribe(new JedisPubSub() { - public void onMessage(String channel, String message) { - } - - public void onPMessage(String pattern, String channel, - String message) { - } - - public void onSubscribe(String channel, int subscribedChannels) { - } - - public void onUnsubscribe(String channel, int subscribedChannels) { - } - - public void onPUnsubscribe(String pattern, - int subscribedChannels) { - } - - public void onPSubscribe(String pattern, int subscribedChannels) { - } - }, "foo"); - } catch (NullPointerException ex) { - fail(); - } catch (JedisDataException ex) { - // this is OK because we are not sending AUTH command - } } @Test(expected = JedisConnectionException.class) @@ -570,7 +410,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { // client-output-buffer exceed // System.out.println("channel - " + channel + // " / message - " + message); - Thread.sleep(500); + Thread.sleep(100); } catch (Exception e) { try { t.join(); @@ -598,13 +438,12 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { String message) { } }, "foo"); - t.join(); } finally { // exit the publisher thread. if exception is thrown, thread might // still keep publishing things. exit.set(true); if (t.isAlive()) { - t.wait(); + t.join(); } } }