Remove Adaptor and update their own class (PubSub)

This commit is contained in:
Jungtaek Lim
2014-11-07 23:11:43 +09:00
parent 4ecca016a5
commit 8a33ab6e9a
7 changed files with 30 additions and 99 deletions

View File

@@ -16,18 +16,17 @@ public abstract class BinaryJedisPubSub {
private int subscribedChannels = 0; private int subscribedChannels = 0;
private Client client; private Client client;
public abstract void onMessage(byte[] channel, byte[] message); public void onMessage(byte[] channel, byte[] message) {}
public abstract void onPMessage(byte[] pattern, byte[] channel, public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {}
byte[] message);
public abstract void onSubscribe(byte[] channel, int subscribedChannels); public void onSubscribe(byte[] channel, int subscribedChannels) {}
public abstract void onUnsubscribe(byte[] channel, int subscribedChannels); public void onUnsubscribe(byte[] channel, int subscribedChannels) {}
public abstract void onPUnsubscribe(byte[] pattern, int subscribedChannels); public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {}
public abstract void onPSubscribe(byte[] pattern, int subscribedChannels); public void onPSubscribe(byte[] pattern, int subscribedChannels) {}
public void unsubscribe() { public void unsubscribe() {
client.unsubscribe(); client.unsubscribe();

View File

@@ -1,33 +0,0 @@
package redis.clients.jedis;
public class BinaryJedisPubSubAdaptor extends BinaryJedisPubSub {
@Override
public void onMessage(byte[] channel, byte[] message) {
}
@Override
public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
}
@Override
public void onSubscribe(byte[] channel, int subscribedChannels) {
}
@Override
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}
@Override
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}
@Override
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
}
}

View File

@@ -18,18 +18,17 @@ public abstract class JedisPubSub {
private int subscribedChannels = 0; private int subscribedChannels = 0;
private volatile Client client; private volatile Client client;
public abstract void onMessage(String channel, String message); public void onMessage(String channel, String message) {}
public abstract void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel, String message) {}
String message);
public abstract void onSubscribe(String channel, int subscribedChannels); public void onSubscribe(String channel, int subscribedChannels) {}
public abstract void onUnsubscribe(String channel, int subscribedChannels); public void onUnsubscribe(String channel, int subscribedChannels) {}
public abstract void onPUnsubscribe(String pattern, int subscribedChannels); public void onPUnsubscribe(String pattern, int subscribedChannels) {}
public abstract void onPSubscribe(String pattern, int subscribedChannels); public void onPSubscribe(String pattern, int subscribedChannels) {}
public void unsubscribe() { public void unsubscribe() {
if (client == null) { if (client == null) {

View File

@@ -1,34 +0,0 @@
package redis.clients.jedis;
public class JedisPubSubAdaptor extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
}
@Override
public void onPMessage(String pattern, String channel, String message) {
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
}
}

View File

@@ -255,7 +255,7 @@ public class JedisSentinelPool extends Pool<Jedis> {
j = new Jedis(host, port); j = new Jedis(host, port);
try { try {
j.subscribe(new JedisPubSubAdaptor() { j.subscribe(new JedisPubSub() {
@Override @Override
public void onMessage(String channel, String message) { public void onMessage(String channel, String message) {
log.fine("Sentinel " + host + ":" + port log.fine("Sentinel " + host + ":" + port

View File

@@ -31,7 +31,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void subscribe() throws InterruptedException { public void subscribe() throws InterruptedException {
jedis.subscribe(new JedisPubSubAdaptor() { jedis.subscribe(new JedisPubSub() {
public void onMessage(String channel, String message) { public void onMessage(String channel, String message) {
assertEquals("foo", channel); assertEquals("foo", channel);
assertEquals("exit", message); assertEquals("exit", message);
@@ -57,7 +57,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
public void pubSubChannels() { public void pubSubChannels() {
final List<String> expectedActiveChannels = Arrays.asList("testchan1", final List<String> expectedActiveChannels = Arrays.asList("testchan1",
"testchan2", "testchan3"); "testchan2", "testchan3");
jedis.subscribe(new JedisPubSubAdaptor() { jedis.subscribe(new JedisPubSub() {
private int count = 0; private int count = 0;
@Override @Override
@@ -78,7 +78,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void pubSubNumPat() { public void pubSubNumPat() {
jedis.psubscribe(new JedisPubSubAdaptor() { jedis.psubscribe(new JedisPubSub() {
private int count = 0; private int count = 0;
@Override @Override
@@ -100,7 +100,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
final Map<String, String> expectedNumSub = new HashMap<String, String>(); final Map<String, String> expectedNumSub = new HashMap<String, String>();
expectedNumSub.put("testchannel2", "1"); expectedNumSub.put("testchannel2", "1");
expectedNumSub.put("testchannel1", "1"); expectedNumSub.put("testchannel1", "1");
jedis.subscribe(new JedisPubSubAdaptor() { jedis.subscribe(new JedisPubSub() {
private int count = 0; private int count = 0;
@Override @Override
@@ -120,7 +120,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void subscribeMany() throws UnknownHostException, IOException, public void subscribeMany() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
jedis.subscribe(new JedisPubSubAdaptor() { jedis.subscribe(new JedisPubSub() {
public void onMessage(String channel, String message) { public void onMessage(String channel, String message) {
unsubscribe(channel); unsubscribe(channel);
} }
@@ -135,7 +135,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void psubscribe() throws UnknownHostException, IOException, public void psubscribe() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
jedis.psubscribe(new JedisPubSubAdaptor() { jedis.psubscribe(new JedisPubSub() {
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
assertEquals("foo.*", pattern); assertEquals("foo.*", pattern);
assertEquals(1, subscribedChannels); assertEquals(1, subscribedChannels);
@@ -161,7 +161,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void psubscribeMany() throws UnknownHostException, IOException, public void psubscribeMany() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
jedis.psubscribe(new JedisPubSubAdaptor() { jedis.psubscribe(new JedisPubSub() {
public void onPSubscribe(String pattern, int subscribedChannels) { public void onPSubscribe(String pattern, int subscribedChannels) {
publishOne(pattern.replace("*", "123"), "exit"); publishOne(pattern.replace("*", "123"), "exit");
} }
@@ -176,7 +176,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void subscribeLazily() throws UnknownHostException, IOException, public void subscribeLazily() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
final JedisPubSub pubsub = new JedisPubSubAdaptor() { final JedisPubSub pubsub = new JedisPubSub() {
public void onMessage(String channel, String message) { public void onMessage(String channel, String message) {
unsubscribe(channel); unsubscribe(channel);
} }
@@ -205,7 +205,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void binarySubscribe() throws UnknownHostException, IOException, public void binarySubscribe() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
jedis.subscribe(new BinaryJedisPubSubAdaptor() { jedis.subscribe(new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) { public void onMessage(byte[] channel, byte[] message) {
assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel)); assertTrue(Arrays.equals(SafeEncoder.encode("foo"), channel));
assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message)); assertTrue(Arrays.equals(SafeEncoder.encode("exit"), message));
@@ -228,7 +228,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void binarySubscribeMany() throws UnknownHostException, IOException, public void binarySubscribeMany() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
jedis.subscribe(new BinaryJedisPubSubAdaptor() { jedis.subscribe(new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) { public void onMessage(byte[] channel, byte[] message) {
unsubscribe(channel); unsubscribe(channel);
} }
@@ -242,7 +242,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void binaryPsubscribe() throws UnknownHostException, IOException, public void binaryPsubscribe() throws UnknownHostException, IOException,
InterruptedException { InterruptedException {
jedis.psubscribe(new BinaryJedisPubSubAdaptor() { jedis.psubscribe(new BinaryJedisPubSub() {
public void onPSubscribe(byte[] pattern, int subscribedChannels) { public void onPSubscribe(byte[] pattern, int subscribedChannels) {
assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern)); assertTrue(Arrays.equals(SafeEncoder.encode("foo.*"), pattern));
assertEquals(1, subscribedChannels); assertEquals(1, subscribedChannels);
@@ -269,7 +269,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void binaryPsubscribeMany() throws UnknownHostException, public void binaryPsubscribeMany() throws UnknownHostException,
IOException, InterruptedException { IOException, InterruptedException {
jedis.psubscribe(new BinaryJedisPubSubAdaptor() { jedis.psubscribe(new BinaryJedisPubSub() {
public void onPSubscribe(byte[] pattern, int subscribedChannels) { public void onPSubscribe(byte[] pattern, int subscribedChannels) {
publishOne(SafeEncoder.encode(pattern).replace("*", "123"), publishOne(SafeEncoder.encode(pattern).replace("*", "123"),
"exit"); "exit");
@@ -285,7 +285,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test @Test
public void binarySubscribeLazily() throws UnknownHostException, public void binarySubscribeLazily() throws UnknownHostException,
IOException, InterruptedException { IOException, InterruptedException {
final BinaryJedisPubSub pubsub = new BinaryJedisPubSubAdaptor() { final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) { public void onMessage(byte[] channel, byte[] message) {
unsubscribe(channel); unsubscribe(channel);
} }
@@ -315,7 +315,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
@Test(expected = JedisConnectionException.class) @Test(expected = JedisConnectionException.class)
public void unsubscribeWhenNotSusbscribed() throws InterruptedException { public void unsubscribeWhenNotSusbscribed() throws InterruptedException {
JedisPubSub pubsub = new JedisPubSubAdaptor(); JedisPubSub pubsub = new JedisPubSub() {};
pubsub.unsubscribe(); pubsub.unsubscribe();
} }
@@ -351,7 +351,7 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase {
}); });
t.start(); t.start();
try { try {
jedis.subscribe(new JedisPubSubAdaptor() { jedis.subscribe(new JedisPubSub() {
public void onMessage(String channel, String message) { public void onMessage(String channel, String message) {
try { try {
// wait 0.5 secs to slow down subscribe and // wait 0.5 secs to slow down subscribe and

View File

@@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicReference;
import redis.clients.jedis.HostAndPort; import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSubAdaptor; import redis.clients.jedis.JedisPubSub;
public class JedisSentinelTestUtil { public class JedisSentinelTestUtil {
public static HostAndPort waitForNewPromotedMaster(final String masterName, public static HostAndPort waitForNewPromotedMaster(final String masterName,
@@ -14,7 +14,7 @@ public class JedisSentinelTestUtil {
final AtomicReference<String> newmaster = new AtomicReference<String>( final AtomicReference<String> newmaster = new AtomicReference<String>(
""); "");
sentinelJedis.psubscribe(new JedisPubSubAdaptor() { sentinelJedis.psubscribe(new JedisPubSub() {
@Override @Override
public void onPMessage(String pattern, String channel, public void onPMessage(String pattern, String channel,