Merge branch 'jedis-pubsub-adaptor' of github.com:HeartSaVioR/jedis into HeartSaVioR-jedis-pubsub-adaptor

This commit is contained in:
Jungtaek Lim
2014-11-11 08:00:32 +09:00
5 changed files with 15 additions and 258 deletions

View File

@@ -16,18 +16,17 @@ public abstract class BinaryJedisPubSub {
private int subscribedChannels = 0; private int subscribedChannels = 0;
private Client client; private Client client;
public abstract void onMessage(byte[] channel, byte[] message); public void onMessage(byte[] channel, byte[] message) {}
public abstract void onPMessage(byte[] pattern, byte[] channel, public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {}
byte[] message);
public abstract void onSubscribe(byte[] channel, int subscribedChannels); public void onSubscribe(byte[] channel, int subscribedChannels) {}
public abstract void onUnsubscribe(byte[] channel, int subscribedChannels); public void onUnsubscribe(byte[] channel, int subscribedChannels) {}
public abstract void onPUnsubscribe(byte[] pattern, int subscribedChannels); public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {}
public abstract void onPSubscribe(byte[] pattern, int subscribedChannels); public void onPSubscribe(byte[] pattern, int subscribedChannels) {}
public void unsubscribe() { public void unsubscribe() {
client.unsubscribe(); client.unsubscribe();

View File

@@ -18,18 +18,17 @@ public abstract class JedisPubSub {
private int subscribedChannels = 0; private int subscribedChannels = 0;
private volatile Client client; private volatile Client client;
public abstract void onMessage(String channel, String message); public void onMessage(String channel, String message) {}
public abstract void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel, String message) {}
String message);
public abstract void onSubscribe(String channel, int subscribedChannels); public void onSubscribe(String channel, int subscribedChannels) {}
public abstract void onUnsubscribe(String channel, int subscribedChannels); public void onUnsubscribe(String channel, int subscribedChannels) {}
public abstract void onPUnsubscribe(String pattern, int subscribedChannels); public void onPUnsubscribe(String pattern, int subscribedChannels) {}
public abstract void onPSubscribe(String pattern, int subscribedChannels); public void onPSubscribe(String pattern, int subscribedChannels) {}
public void unsubscribe() { public void unsubscribe() {
if (client == null) { if (client == null) {

View File

@@ -222,32 +222,6 @@ public class JedisSentinelPool extends Pool<Jedis> {
} }
} }
protected class JedisPubSubAdapter extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
}
@Override
public void onPMessage(String pattern, String channel, String message) {
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
}
protected class MasterListener extends Thread { protected class MasterListener extends Thread {
protected String masterName; protected String masterName;
@@ -281,7 +255,7 @@ public class JedisSentinelPool extends Pool<Jedis> {
j = new Jedis(host, port); j = new Jedis(host, port);
try { try {
j.subscribe(new JedisPubSubAdapter() { j.subscribe(new JedisPubSub() {
@Override @Override
public void onMessage(String channel, String message) { public void onMessage(String channel, String message) {
log.fine("Sentinel " + host + ":" + port log.fine("Sentinel " + host + ":" + port

View File

@@ -10,9 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test; import org.junit.Test;
import redis.clients.jedis.BinaryJedisPubSub; import redis.clients.jedis.*;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.SafeEncoder; import redis.clients.util.SafeEncoder;
@@ -52,16 +50,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
assertEquals("foo", channel); assertEquals("foo", channel);
assertEquals(0, subscribedChannels); assertEquals(0, subscribedChannels);
} }
public void onPSubscribe(String pattern, int subscribedChannels) {
}
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
public void onPMessage(String pattern, String channel,
String message) {
}
}, "foo"); }, "foo");
} }
@@ -72,10 +60,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
jedis.subscribe(new JedisPubSub() { jedis.subscribe(new JedisPubSub() {
private int count = 0; private int count = 0;
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
@Override @Override
public void onSubscribe(String channel, int subscribedChannels) { public void onSubscribe(String channel, int subscribedChannels) {
count++; count++;
@@ -89,23 +73,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
unsubscribe(); unsubscribe();
} }
} }
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onPMessage(String pattern, String channel,
String message) {
}
@Override
public void onMessage(String channel, String message) {
}
}, "testchan1", "testchan2", "testchan3"); }, "testchan1", "testchan2", "testchan3");
} }
@@ -114,18 +81,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
jedis.psubscribe(new JedisPubSub() { jedis.psubscribe(new JedisPubSub() {
private int count = 0; private int count = 0;
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
@Override @Override
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
count++; count++;
@@ -137,14 +92,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
} }
} }
@Override
public void onPMessage(String pattern, String channel,
String message) {
}
@Override
public void onMessage(String channel, String message) {
}
}, "test*", "test*", "chan*"); }, "test*", "test*", "chan*");
} }
@@ -156,10 +103,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
jedis.subscribe(new JedisPubSub() { jedis.subscribe(new JedisPubSub() {
private int count = 0; private int count = 0;
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
@Override @Override
public void onSubscribe(String channel, int subscribedChannels) { public void onSubscribe(String channel, int subscribedChannels) {
count++; count++;
@@ -171,23 +114,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
unsubscribe(); unsubscribe();
} }
} }
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onPMessage(String pattern, String channel,
String message) {
}
@Override
public void onMessage(String channel, String message) {
}
}, "testchannel1", "testchannel2"); }, "testchannel1", "testchannel2");
} }
@@ -203,18 +129,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
publishOne(channel, "exit"); publishOne(channel, "exit");
} }
public void onUnsubscribe(String channel, int subscribedChannels) {
}
public void onPSubscribe(String pattern, int subscribedChannels) {
}
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
public void onPMessage(String pattern, String channel,
String message) {
}
}, "foo", "bar"); }, "foo", "bar");
} }
@@ -222,15 +136,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
public void psubscribe() throws UnknownHostException, IOException, public void psubscribe() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
jedis.psubscribe(new JedisPubSub() { jedis.psubscribe(new JedisPubSub() {
public void onMessage(String channel, String message) {
}
public void onSubscribe(String channel, int subscribedChannels) {
}
public void onUnsubscribe(String channel, int subscribedChannels) {
}
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
assertEquals("foo.*", pattern); assertEquals("foo.*", pattern);
assertEquals(1, subscribedChannels); assertEquals(1, subscribedChannels);
@@ -257,22 +162,10 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
public void psubscribeMany() throws UnknownHostException, IOException, public void psubscribeMany() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
jedis.psubscribe(new JedisPubSub() { jedis.psubscribe(new JedisPubSub() {
public void onMessage(String channel, String message) {
}
public void onSubscribe(String channel, int subscribedChannels) {
}
public void onUnsubscribe(String channel, int subscribedChannels) {
}
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
publishOne(pattern.replace("*", "123"), "exit"); publishOne(pattern.replace("*", "123"), "exit");
} }
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
public void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel,
String message) { String message) {
punsubscribe(pattern); punsubscribe(pattern);
@@ -296,16 +189,10 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
} }
} }
public void onUnsubscribe(String channel, int subscribedChannels) {
}
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
publishOne(pattern.replace("*", "123"), "exit"); publishOne(pattern.replace("*", "123"), "exit");
} }
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
public void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel,
String message) { String message) {
punsubscribe(pattern); punsubscribe(pattern);
@@ -335,16 +222,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel));
assertEquals(0, subscribedChannels); assertEquals(0, subscribedChannels);
} }
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPMessage(byte[] pattern, byte[] channel,
byte[] message) {
}
}, SafeEncoder.encode("foo")); }, SafeEncoder.encode("foo"));
} }
@@ -359,19 +236,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
public void onSubscribe(byte[] channel, int subscribedChannels) { public void onSubscribe(byte[] channel, int subscribedChannels) {
publishOne(SafeEncoder.encode(channel), "exit"); publishOne(SafeEncoder.encode(channel), "exit");
} }
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPMessage(byte[] pattern, byte[] channel,
byte[] message) {
}
}, SafeEncoder.encode("foo"), SafeEncoder.encode("bar")); }, SafeEncoder.encode("foo"), SafeEncoder.encode("bar"));
} }
@@ -379,15 +243,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
public void binaryPsubscribe() throws UnknownHostException, IOException, public void binaryPsubscribe() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
jedis.psubscribe(new BinaryJedisPubSub() { jedis.psubscribe(new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) {
}
public void onSubscribe(byte[] channel, int subscribedChannels) {
}
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}
public void onPSubscribe(byte[] pattern, int subscribedChannels) { public void onPSubscribe(byte[] pattern, int subscribedChannels) {
assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern));
assertEquals(1, subscribedChannels); assertEquals(1, subscribedChannels);
@@ -415,23 +270,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
public void binaryPsubscribeMany() throws UnknownHostException, public void binaryPsubscribeMany() throws UnknownHostException,
IOException, InterruptedException { IOException, InterruptedException {
jedis.psubscribe(new BinaryJedisPubSub() { jedis.psubscribe(new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) {
}
public void onSubscribe(byte[] channel, int subscribedChannels) {
}
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}
public void onPSubscribe(byte[] pattern, int subscribedChannels) { public void onPSubscribe(byte[] pattern, int subscribedChannels) {
publishOne(SafeEncoder.encode(pattern).replace("*", "123"), publishOne(SafeEncoder.encode(pattern).replace("*", "123"),
"exit"); "exit");
} }
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPMessage(byte[] pattern, byte[] channel, public void onPMessage(byte[] pattern, byte[] channel,
byte[] message) { byte[] message) {
punsubscribe(pattern); punsubscribe(pattern);
@@ -456,17 +299,11 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
} }
} }
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}
public void onPSubscribe(byte[] pattern, int subscribedChannels) { public void onPSubscribe(byte[] pattern, int subscribedChannels) {
publishOne(SafeEncoder.encode(pattern).replace("*", "123"), publishOne(SafeEncoder.encode(pattern).replace("*", "123"),
"exit"); "exit");
} }
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}
public void onPMessage(byte[] pattern, byte[] channel, public void onPMessage(byte[] pattern, byte[] channel,
byte[] message) { byte[] message) {
punsubscribe(pattern); punsubscribe(pattern);
@@ -478,26 +315,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test(expected = JedisConnectionException.class) @Test(expected = JedisConnectionException.class)
public void unsubscribeWhenNotSusbscribed() throws InterruptedException { public void unsubscribeWhenNotSusbscribed() throws InterruptedException {
JedisPubSub pubsub = new JedisPubSub() { JedisPubSub pubsub = new JedisPubSub() {};
public void onMessage(String channel, String message) {
}
public void onPMessage(String pattern, String channel,
String message) {
}
public void onSubscribe(String channel, int subscribedChannels) {
}
public void onUnsubscribe(String channel, int subscribedChannels) {
}
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
public void onPSubscribe(String pattern, int subscribedChannels) {
}
};
pubsub.unsubscribe(); pubsub.unsubscribe();
} }
@@ -550,23 +368,6 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
fail(e.getMessage()); fail(e.getMessage());
} }
} }
public void onSubscribe(String channel, int subscribedChannels) {
}
public void onUnsubscribe(String channel, int subscribedChannels) {
}
public void onPSubscribe(String pattern, int subscribedChannels) {
}
public void onPUnsubscribe(String pattern,
int subscribedChannels) {
}
public void onPMessage(String pattern, String channel,
String message) {
}
}, "foo"); }, "foo");
} finally { } finally {
// exit the publisher thread. if exception is thrown, thread might // exit the publisher thread. if exception is thrown, thread might

View File

@@ -16,10 +16,6 @@ public class JedisSentinelTestUtil {
sentinelJedis.psubscribe(new JedisPubSub() { sentinelJedis.psubscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
}
@Override @Override
public void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel,
String message) { String message) {
@@ -34,18 +30,6 @@ public class JedisSentinelTestUtil {
} }
} }
@Override
public void onSubscribe(String channel, int subscribedChannels) {
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
@Override @Override
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
commandJedis.sentinelFailover(masterName); commandJedis.sentinelFailover(masterName);