add binary support for pubsub

This commit is contained in:
Jonathan Leibiusky
2011-01-24 13:24:08 -03:00
parent 7150c5feb3
commit 4d5e5a7c5d
6 changed files with 440 additions and 31 deletions

View File

@@ -2,12 +2,15 @@ package redis.clients.jedis.tests.commands;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import org.junit.Test;
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisException;
import redis.clients.jedis.JedisPubSub;
import redis.clients.util.SafeEncoder;
public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test
@@ -233,30 +236,270 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
jedis.subscribe(pubsub, "foo");
t.join();
}
@Test
public void binarySubscribe() throws UnknownHostException, IOException,
InterruptedException {
Thread t = new Thread(new Runnable() {
public void run() {
try {
Jedis j = createJedis();
Thread.sleep(1000);
j.publish(SafeEncoder.encode("foo"), SafeEncoder
.encode("exit"));
j.disconnect();
} catch (Exception ex) {
fail(ex.getMessage());
}
}
});
t.start();
jedis.subscribe(new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) {
assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel));
assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message));
unsubscribe();
}
public void onSubscribe(byte[] channel, int subscribedChannels) {
assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel));
assertEquals(1, subscribedChannels);
}
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel));
assertEquals(0, subscribedChannels);
}
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPMessage(byte[] pattern, byte[] channel,
byte[] message) {
}
}, SafeEncoder.encode("foo"));
t.join();
}
@Test
public void binarySubscribeMany() throws UnknownHostException, IOException,
InterruptedException {
Thread t = new Thread(new Runnable() {
public void run() {
try {
Jedis j = createJedis();
Thread.sleep(1000);
j.publish(SafeEncoder.encode("foo"), SafeEncoder
.encode("exit"));
Thread.sleep(1000);
j.publish(SafeEncoder.encode("bar"), SafeEncoder
.encode("exit"));
j.disconnect();
} catch (Exception ex) {
fail(ex.getMessage());
}
}
});
t.start();
jedis.subscribe(new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) {
unsubscribe(channel);
}
public void onSubscribe(byte[] channel, int subscribedChannels) {
}
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPMessage(byte[] pattern, byte[] channel,
byte[] message) {
}
}, SafeEncoder.encode("foo"), SafeEncoder.encode("bar"));
t.join();
}
@Test
public void binaryPsubscribe() throws UnknownHostException, IOException,
InterruptedException {
Thread t = new Thread(new Runnable() {
public void run() {
try {
Jedis j = createJedis();
Thread.sleep(1000);
j.publish(SafeEncoder.encode("foo.bar"), SafeEncoder
.encode("exit"));
j.disconnect();
} catch (Exception ex) {
fail(ex.getMessage());
}
}
});
t.start();
jedis.psubscribe(new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) {
}
public void onSubscribe(byte[] channel, int subscribedChannels) {
}
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern));
assertEquals(1, subscribedChannels);
}
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern));
assertEquals(0, subscribedChannels);
}
public void onPMessage(byte[] pattern, byte[] channel,
byte[] message) {
assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern));
assertTrue(Arrays
.equals(SafeEncoder.encode("foo.bar"), channel));
assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message));
punsubscribe();
}
}, SafeEncoder.encode("foo.*"));
t.join();
}
@Test
public void binaryPsubscribeMany() throws UnknownHostException,
IOException, InterruptedException {
Thread t = new Thread(new Runnable() {
public void run() {
try {
Jedis j = createJedis();
Thread.sleep(1000);
j.publish(SafeEncoder.encode("foo.123"), SafeEncoder
.encode("exit"));
Thread.sleep(1000);
j.publish(SafeEncoder.encode("bar.123"), SafeEncoder
.encode("exit"));
j.disconnect();
} catch (Exception ex) {
fail(ex.getMessage());
}
}
});
t.start();
jedis.psubscribe(new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) {
}
public void onSubscribe(byte[] channel, int subscribedChannels) {
}
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPMessage(byte[] pattern, byte[] channel,
byte[] message) {
punsubscribe(pattern);
}
}, SafeEncoder.encode("foo.*"), SafeEncoder.encode("bar.*"));
t.join();
}
@Test
public void binarySubscribeLazily() throws UnknownHostException,
IOException, InterruptedException {
final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) {
unsubscribe(channel);
}
public void onSubscribe(byte[] channel, int subscribedChannels) {
}
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPMessage(byte[] pattern, byte[] channel,
byte[] message) {
punsubscribe(pattern);
}
};
Thread t = new Thread(new Runnable() {
public void run() {
try {
Jedis j = createJedis();
Thread.sleep(1000);
pubsub.subscribe(SafeEncoder.encode("bar"));
pubsub.psubscribe(SafeEncoder.encode("bar.*"));
j.publish(SafeEncoder.encode("foo"), SafeEncoder
.encode("exit"));
j.publish(SafeEncoder.encode("bar"), SafeEncoder
.encode("exit"));
j.publish(SafeEncoder.encode("bar.123"), SafeEncoder
.encode("exit"));
j.disconnect();
} catch (Exception ex) {
fail(ex.getMessage());
}
}
});
t.start();
jedis.subscribe(pubsub, SafeEncoder.encode("foo"));
t.join();
}
@Test
public void subscribeWithoutConnecting() {
try {
Jedis jedis = new Jedis(hnp.host, hnp.port);
jedis.subscribe(new JedisPubSub() {
public void onMessage(String channel, String message) {
}
public void onPMessage(String pattern, String channel,
String message) {
}
public void onSubscribe(String channel, int subscribedChannels) {
}
public void onUnsubscribe(String channel, int subscribedChannels) {
}
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
public void onPSubscribe(String pattern, int subscribedChannels) {
}
}, "foo");
} catch(NullPointerException ex) {
fail();
} catch(JedisException ex) {
// this is OK because we are not sending AUTH command
}
try {
Jedis jedis = new Jedis(hnp.host, hnp.port);
jedis.subscribe(new JedisPubSub() {
public void onMessage(String channel, String message) {
}
public void onPMessage(String pattern, String channel,
String message) {
}
public void onSubscribe(String channel, int subscribedChannels) {
}
public void onUnsubscribe(String channel, int subscribedChannels) {
}
public void onPUnsubscribe(String pattern,
int subscribedChannels) {
}
public void onPSubscribe(String pattern, int subscribedChannels) {
}
}, "foo");
} catch (NullPointerException ex) {
fail();
} catch (JedisException ex) {
// this is OK because we are not sending AUTH command
}
}
}