|
|
|
|
@@ -10,9 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
|
|
|
|
import org.junit.Test;
|
|
|
|
|
|
|
|
|
|
import redis.clients.jedis.BinaryJedisPubSub;
|
|
|
|
|
import redis.clients.jedis.Jedis;
|
|
|
|
|
import redis.clients.jedis.JedisPubSub;
|
|
|
|
|
import redis.clients.jedis.*;
|
|
|
|
|
import redis.clients.jedis.exceptions.JedisConnectionException;
|
|
|
|
|
import redis.clients.util.SafeEncoder;
|
|
|
|
|
|
|
|
|
|
@@ -33,7 +31,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void subscribe() throws InterruptedException {
|
|
|
|
|
jedis.subscribe(new JedisPubSub() {
|
|
|
|
|
jedis.subscribe(new JedisPubSubAdaptor() {
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
assertEquals("foo", channel);
|
|
|
|
|
assertEquals("exit", message);
|
|
|
|
|
@@ -52,16 +50,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
assertEquals("foo", channel);
|
|
|
|
|
assertEquals(0, subscribedChannels);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(String pattern, String channel,
|
|
|
|
|
String message) {
|
|
|
|
|
}
|
|
|
|
|
}, "foo");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -69,13 +57,9 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
public void pubSubChannels() {
|
|
|
|
|
final List<String> expectedActiveChannels = Arrays.asList("testchan1",
|
|
|
|
|
"testchan2", "testchan3");
|
|
|
|
|
jedis.subscribe(new JedisPubSub() {
|
|
|
|
|
jedis.subscribe(new JedisPubSubAdaptor() {
|
|
|
|
|
private int count = 0;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onSubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
count++;
|
|
|
|
|
@@ -89,43 +73,14 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
unsubscribe();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onPMessage(String pattern, String channel,
|
|
|
|
|
String message) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
}
|
|
|
|
|
}, "testchan1", "testchan2", "testchan3");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void pubSubNumPat() {
|
|
|
|
|
jedis.psubscribe(new JedisPubSub() {
|
|
|
|
|
jedis.psubscribe(new JedisPubSubAdaptor() {
|
|
|
|
|
private int count = 0;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onSubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
count++;
|
|
|
|
|
@@ -137,14 +92,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onPMessage(String pattern, String channel,
|
|
|
|
|
String message) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
}
|
|
|
|
|
}, "test*", "test*", "chan*");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -153,13 +100,9 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
final Map<String, String> expectedNumSub = new HashMap<String, String>();
|
|
|
|
|
expectedNumSub.put("testchannel2", "1");
|
|
|
|
|
expectedNumSub.put("testchannel1", "1");
|
|
|
|
|
jedis.subscribe(new JedisPubSub() {
|
|
|
|
|
jedis.subscribe(new JedisPubSubAdaptor() {
|
|
|
|
|
private int count = 0;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onSubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
count++;
|
|
|
|
|
@@ -171,30 +114,13 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
unsubscribe();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onPMessage(String pattern, String channel,
|
|
|
|
|
String message) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
}
|
|
|
|
|
}, "testchannel1", "testchannel2");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void subscribeMany() throws UnknownHostException, IOException,
|
|
|
|
|
InterruptedException {
|
|
|
|
|
jedis.subscribe(new JedisPubSub() {
|
|
|
|
|
jedis.subscribe(new JedisPubSubAdaptor() {
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
unsubscribe(channel);
|
|
|
|
|
}
|
|
|
|
|
@@ -203,34 +129,13 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
publishOne(channel, "exit");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(String pattern, String channel,
|
|
|
|
|
String message) {
|
|
|
|
|
}
|
|
|
|
|
}, "foo", "bar");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void psubscribe() throws UnknownHostException, IOException,
|
|
|
|
|
InterruptedException {
|
|
|
|
|
jedis.psubscribe(new JedisPubSub() {
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onSubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jedis.psubscribe(new JedisPubSubAdaptor() {
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
assertEquals("foo.*", pattern);
|
|
|
|
|
assertEquals(1, subscribedChannels);
|
|
|
|
|
@@ -256,23 +161,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
@Test
|
|
|
|
|
public void psubscribeMany() throws UnknownHostException, IOException,
|
|
|
|
|
InterruptedException {
|
|
|
|
|
jedis.psubscribe(new JedisPubSub() {
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onSubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jedis.psubscribe(new JedisPubSubAdaptor() {
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
publishOne(pattern.replace("*", "123"), "exit");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(String pattern, String channel,
|
|
|
|
|
String message) {
|
|
|
|
|
punsubscribe(pattern);
|
|
|
|
|
@@ -283,7 +176,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
@Test
|
|
|
|
|
public void subscribeLazily() throws UnknownHostException, IOException,
|
|
|
|
|
InterruptedException {
|
|
|
|
|
final JedisPubSub pubsub = new JedisPubSub() {
|
|
|
|
|
final JedisPubSub pubsub = new JedisPubSubAdaptor() {
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
unsubscribe(channel);
|
|
|
|
|
}
|
|
|
|
|
@@ -296,16 +189,10 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
publishOne(pattern.replace("*", "123"), "exit");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(String pattern, String channel,
|
|
|
|
|
String message) {
|
|
|
|
|
punsubscribe(pattern);
|
|
|
|
|
@@ -318,7 +205,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
@Test
|
|
|
|
|
public void binarySubscribe() throws UnknownHostException, IOException,
|
|
|
|
|
InterruptedException {
|
|
|
|
|
jedis.subscribe(new BinaryJedisPubSub() {
|
|
|
|
|
jedis.subscribe(new BinaryJedisPubSubAdaptor() {
|
|
|
|
|
public void onMessage(byte[] channel, byte[] message) {
|
|
|
|
|
assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel));
|
|
|
|
|
assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message));
|
|
|
|
|
@@ -335,23 +222,13 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel));
|
|
|
|
|
assertEquals(0, subscribedChannels);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(byte[] pattern, byte[] channel,
|
|
|
|
|
byte[] message) {
|
|
|
|
|
}
|
|
|
|
|
}, SafeEncoder.encode("foo"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void binarySubscribeMany() throws UnknownHostException, IOException,
|
|
|
|
|
InterruptedException {
|
|
|
|
|
jedis.subscribe(new BinaryJedisPubSub() {
|
|
|
|
|
jedis.subscribe(new BinaryJedisPubSubAdaptor() {
|
|
|
|
|
public void onMessage(byte[] channel, byte[] message) {
|
|
|
|
|
unsubscribe(channel);
|
|
|
|
|
}
|
|
|
|
|
@@ -359,35 +236,13 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
public void onSubscribe(byte[] channel, int subscribedChannels) {
|
|
|
|
|
publishOne(SafeEncoder.encode(channel), "exit");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(byte[] pattern, byte[] channel,
|
|
|
|
|
byte[] message) {
|
|
|
|
|
}
|
|
|
|
|
}, SafeEncoder.encode("foo"), SafeEncoder.encode("bar"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void binaryPsubscribe() throws UnknownHostException, IOException,
|
|
|
|
|
InterruptedException {
|
|
|
|
|
jedis.psubscribe(new BinaryJedisPubSub() {
|
|
|
|
|
public void onMessage(byte[] channel, byte[] message) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onSubscribe(byte[] channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jedis.psubscribe(new BinaryJedisPubSubAdaptor() {
|
|
|
|
|
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
|
|
|
|
|
assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern));
|
|
|
|
|
assertEquals(1, subscribedChannels);
|
|
|
|
|
@@ -414,24 +269,12 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
@Test
|
|
|
|
|
public void binaryPsubscribeMany() throws UnknownHostException,
|
|
|
|
|
IOException, InterruptedException {
|
|
|
|
|
jedis.psubscribe(new BinaryJedisPubSub() {
|
|
|
|
|
public void onMessage(byte[] channel, byte[] message) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onSubscribe(byte[] channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jedis.psubscribe(new BinaryJedisPubSubAdaptor() {
|
|
|
|
|
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
|
|
|
|
|
publishOne(SafeEncoder.encode(pattern).replace("*", "123"),
|
|
|
|
|
"exit");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(byte[] pattern, byte[] channel,
|
|
|
|
|
byte[] message) {
|
|
|
|
|
punsubscribe(pattern);
|
|
|
|
|
@@ -442,7 +285,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
@Test
|
|
|
|
|
public void binarySubscribeLazily() throws UnknownHostException,
|
|
|
|
|
IOException, InterruptedException {
|
|
|
|
|
final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() {
|
|
|
|
|
final BinaryJedisPubSub pubsub = new BinaryJedisPubSubAdaptor() {
|
|
|
|
|
public void onMessage(byte[] channel, byte[] message) {
|
|
|
|
|
unsubscribe(channel);
|
|
|
|
|
}
|
|
|
|
|
@@ -456,17 +299,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
|
|
|
|
|
publishOne(SafeEncoder.encode(pattern).replace("*", "123"),
|
|
|
|
|
"exit");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(byte[] pattern, byte[] channel,
|
|
|
|
|
byte[] message) {
|
|
|
|
|
punsubscribe(pattern);
|
|
|
|
|
@@ -478,26 +315,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
|
|
|
|
|
@Test(expected = JedisConnectionException.class)
|
|
|
|
|
public void unsubscribeWhenNotSusbscribed() throws InterruptedException {
|
|
|
|
|
JedisPubSub pubsub = new JedisPubSub() {
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(String pattern, String channel,
|
|
|
|
|
String message) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onSubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
JedisPubSub pubsub = new JedisPubSubAdaptor();
|
|
|
|
|
pubsub.unsubscribe();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -533,7 +351,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
});
|
|
|
|
|
t.start();
|
|
|
|
|
try {
|
|
|
|
|
jedis.subscribe(new JedisPubSub() {
|
|
|
|
|
jedis.subscribe(new JedisPubSubAdaptor() {
|
|
|
|
|
public void onMessage(String channel, String message) {
|
|
|
|
|
try {
|
|
|
|
|
// wait 0.5 secs to slow down subscribe and
|
|
|
|
|
@@ -550,23 +368,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
|
|
|
|
|
fail(e.getMessage());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onSubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPUnsubscribe(String pattern,
|
|
|
|
|
int subscribedChannels) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onPMessage(String pattern, String channel,
|
|
|
|
|
String message) {
|
|
|
|
|
}
|
|
|
|
|
}, "foo");
|
|
|
|
|
} finally {
|
|
|
|
|
// exit the publisher thread. if exception is thrown, thread might
|
|
|
|
|
|