diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index 245a0c9..fb0cc7e 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1,15 +1,19 @@ package redis.clients.jedis; +import static redis.clients.jedis.Protocol.toByteArray; +import static redis.clients.jedis.Protocol.Command.*; +import static redis.clients.jedis.Protocol.Keyword.LIMIT; +import static redis.clients.jedis.Protocol.Keyword.NO; +import static redis.clients.jedis.Protocol.Keyword.ONE; +import static redis.clients.jedis.Protocol.Keyword.STORE; +import static redis.clients.jedis.Protocol.Keyword.WITHSCORES; + import java.util.ArrayList; import java.util.List; import java.util.Map; import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.Protocol.Keyword; -import static redis.clients.jedis.Protocol.toByteArray; - -import static redis.clients.jedis.Protocol.Command.*; -import static redis.clients.jedis.Protocol.Keyword.*; public class BinaryClient extends Connection { public enum LIST_POSITION { BEFORE, AFTER; diff --git a/src/main/java/redis/clients/jedis/JedisPubSub.java b/src/main/java/redis/clients/jedis/JedisPubSub.java index ac6918d..e0c5dec 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSub.java +++ b/src/main/java/redis/clients/jedis/JedisPubSub.java @@ -1,7 +1,10 @@ package redis.clients.jedis; +import java.util.Arrays; import java.util.List; +import static redis.clients.jedis.Protocol.Keyword.*; + public abstract class JedisPubSub { private int subscribedChannels = 0; private Client client; @@ -62,26 +65,78 @@ public abstract class JedisPubSub { private void process(Client client) { do { List reply = client.getObjectMultiBulkReply(); - if (reply.get(0).equals("subscribe")) { + final Object firstObj = reply.get(0); + if (!(firstObj instanceof byte[])) { + throw + new JedisException("Unknown message type: "+ firstObj); + } + final byte[] resp = (byte[]) firstObj; + if(Arrays.equals(SUBSCRIBE.raw, resp)) { subscribedChannels = ((Integer) reply.get(2)).intValue(); - onSubscribe((String) reply.get(1), subscribedChannels); - } else if (reply.get(0).equals("unsubscribe")) { + final byte[] bchannel = (byte[]) reply.get(1); + final String strchannel = + (bchannel == null) ? + null : + new String(bchannel, Protocol.UTF8); + onSubscribe(strchannel, subscribedChannels); + } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Integer) reply.get(2)).intValue(); - onUnsubscribe((String) reply.get(1), subscribedChannels); - } else if (reply.get(0).equals("message")) { - onMessage((String) reply.get(1), (String) reply.get(2)); - } else if (reply.get(0).equals("pmessage")) { - onPMessage((String) reply.get(1), (String) reply.get(2), - (String) reply.get(3)); - } else if (reply.get(0).equals("psubscribe")) { + final byte[] bchannel = (byte[]) reply.get(1); + final String strchannel = + (bchannel == null) ? + null : + new String(bchannel, Protocol.UTF8); + onUnsubscribe(strchannel, subscribedChannels); + } else if (Arrays.equals(MESSAGE.raw, resp)) { + final byte[] bchannel = (byte[]) reply.get(1); + final byte[] bmesg = (byte[]) reply.get(2); + final String strchannel = + (bchannel == null) ? + null : + new String(bchannel, Protocol.UTF8); + final String strmesg = + (bmesg == null) ? + null : + new String(bmesg, Protocol.UTF8); + onMessage(strchannel, strmesg); + } else if (Arrays.equals(PMESSAGE.raw, resp)) { + final byte[] bpattern = (byte[]) reply.get(1); + final byte[] bchannel = (byte[]) reply.get(2); + final byte[] bmesg = (byte[]) reply.get(3); + final String strpattern = + (bpattern == null) ? + null : + new String(bpattern, Protocol.UTF8); + final String strchannel = + (bchannel == null) ? + null : + new String(bchannel, Protocol.UTF8); + final String strmesg = + (bmesg == null) ? + null : + new String(bmesg, Protocol.UTF8); + onPMessage( + strpattern, + strchannel, + strmesg); + } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { subscribedChannels = ((Integer) reply.get(2)).intValue(); - onPSubscribe((String) reply.get(1), subscribedChannels); - } else if (reply.get(0).equals("punsubscribe")) { + final byte[] bpattern = (byte[]) reply.get(1); + final String strpattern = + (bpattern == null) ? + null : + new String(bpattern, Protocol.UTF8); + onPSubscribe(strpattern, subscribedChannels); + } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Integer) reply.get(2)).intValue(); - onPUnsubscribe((String) reply.get(1), subscribedChannels); + final byte[] bpattern = (byte[]) reply.get(1); + final String strpattern = + (bpattern == null) ? + null : + new String(bpattern, Protocol.UTF8); + onPUnsubscribe(strpattern, subscribedChannels); } else { - throw new JedisException("Unknown message type: " - + reply.get(0)); + throw new JedisException("Unknown message type: "+ firstObj); } } while (isSubscribed()); } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index a6a89e4..7ae5b2a 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -286,18 +286,24 @@ public final class Protocol { BY, DESC, GET, + LIMIT, + MESSAGE, NO, NOSORT, + PMESSAGE, + PSUBSCRIBE, + PUNSUBSCRIBE, ONE, - LIMIT, SET, STORE, + SUBSCRIBE, + UNSUBSCRIBE, WEIGHTS, WITHSCORES; public final byte[] raw; Keyword() { - raw = this.name().getBytes(UTF8); + raw = this.name().toLowerCase().getBytes(UTF8); } }