Ready to be merged back to master ...
Last buggy U tests fixed.
This commit is contained in:
@@ -1,15 +1,19 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import static redis.clients.jedis.Protocol.toByteArray;
|
||||
import static redis.clients.jedis.Protocol.Command.*;
|
||||
import static redis.clients.jedis.Protocol.Keyword.LIMIT;
|
||||
import static redis.clients.jedis.Protocol.Keyword.NO;
|
||||
import static redis.clients.jedis.Protocol.Keyword.ONE;
|
||||
import static redis.clients.jedis.Protocol.Keyword.STORE;
|
||||
import static redis.clients.jedis.Protocol.Keyword.WITHSCORES;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import redis.clients.jedis.Protocol.Command;
|
||||
import redis.clients.jedis.Protocol.Keyword;
|
||||
import static redis.clients.jedis.Protocol.toByteArray;
|
||||
|
||||
import static redis.clients.jedis.Protocol.Command.*;
|
||||
import static redis.clients.jedis.Protocol.Keyword.*;
|
||||
public class BinaryClient extends Connection {
|
||||
public enum LIST_POSITION {
|
||||
BEFORE, AFTER;
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static redis.clients.jedis.Protocol.Keyword.*;
|
||||
|
||||
public abstract class JedisPubSub {
|
||||
private int subscribedChannels = 0;
|
||||
private Client client;
|
||||
@@ -62,26 +65,78 @@ public abstract class JedisPubSub {
|
||||
private void process(Client client) {
|
||||
do {
|
||||
List<Object> reply = client.getObjectMultiBulkReply();
|
||||
if (reply.get(0).equals("subscribe")) {
|
||||
final Object firstObj = reply.get(0);
|
||||
if (!(firstObj instanceof byte[])) {
|
||||
throw
|
||||
new JedisException("Unknown message type: "+ firstObj);
|
||||
}
|
||||
final byte[] resp = (byte[]) firstObj;
|
||||
if(Arrays.equals(SUBSCRIBE.raw, resp)) {
|
||||
subscribedChannels = ((Integer) reply.get(2)).intValue();
|
||||
onSubscribe((String) reply.get(1), subscribedChannels);
|
||||
} else if (reply.get(0).equals("unsubscribe")) {
|
||||
final byte[] bchannel = (byte[]) reply.get(1);
|
||||
final String strchannel =
|
||||
(bchannel == null) ?
|
||||
null :
|
||||
new String(bchannel, Protocol.UTF8);
|
||||
onSubscribe(strchannel, subscribedChannels);
|
||||
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
|
||||
subscribedChannels = ((Integer) reply.get(2)).intValue();
|
||||
onUnsubscribe((String) reply.get(1), subscribedChannels);
|
||||
} else if (reply.get(0).equals("message")) {
|
||||
onMessage((String) reply.get(1), (String) reply.get(2));
|
||||
} else if (reply.get(0).equals("pmessage")) {
|
||||
onPMessage((String) reply.get(1), (String) reply.get(2),
|
||||
(String) reply.get(3));
|
||||
} else if (reply.get(0).equals("psubscribe")) {
|
||||
final byte[] bchannel = (byte[]) reply.get(1);
|
||||
final String strchannel =
|
||||
(bchannel == null) ?
|
||||
null :
|
||||
new String(bchannel, Protocol.UTF8);
|
||||
onUnsubscribe(strchannel, subscribedChannels);
|
||||
} else if (Arrays.equals(MESSAGE.raw, resp)) {
|
||||
final byte[] bchannel = (byte[]) reply.get(1);
|
||||
final byte[] bmesg = (byte[]) reply.get(2);
|
||||
final String strchannel =
|
||||
(bchannel == null) ?
|
||||
null :
|
||||
new String(bchannel, Protocol.UTF8);
|
||||
final String strmesg =
|
||||
(bmesg == null) ?
|
||||
null :
|
||||
new String(bmesg, Protocol.UTF8);
|
||||
onMessage(strchannel, strmesg);
|
||||
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
|
||||
final byte[] bpattern = (byte[]) reply.get(1);
|
||||
final byte[] bchannel = (byte[]) reply.get(2);
|
||||
final byte[] bmesg = (byte[]) reply.get(3);
|
||||
final String strpattern =
|
||||
(bpattern == null) ?
|
||||
null :
|
||||
new String(bpattern, Protocol.UTF8);
|
||||
final String strchannel =
|
||||
(bchannel == null) ?
|
||||
null :
|
||||
new String(bchannel, Protocol.UTF8);
|
||||
final String strmesg =
|
||||
(bmesg == null) ?
|
||||
null :
|
||||
new String(bmesg, Protocol.UTF8);
|
||||
onPMessage(
|
||||
strpattern,
|
||||
strchannel,
|
||||
strmesg);
|
||||
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
|
||||
subscribedChannels = ((Integer) reply.get(2)).intValue();
|
||||
onPSubscribe((String) reply.get(1), subscribedChannels);
|
||||
} else if (reply.get(0).equals("punsubscribe")) {
|
||||
final byte[] bpattern = (byte[]) reply.get(1);
|
||||
final String strpattern =
|
||||
(bpattern == null) ?
|
||||
null :
|
||||
new String(bpattern, Protocol.UTF8);
|
||||
onPSubscribe(strpattern, subscribedChannels);
|
||||
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
|
||||
subscribedChannels = ((Integer) reply.get(2)).intValue();
|
||||
onPUnsubscribe((String) reply.get(1), subscribedChannels);
|
||||
final byte[] bpattern = (byte[]) reply.get(1);
|
||||
final String strpattern =
|
||||
(bpattern == null) ?
|
||||
null :
|
||||
new String(bpattern, Protocol.UTF8);
|
||||
onPUnsubscribe(strpattern, subscribedChannels);
|
||||
} else {
|
||||
throw new JedisException("Unknown message type: "
|
||||
+ reply.get(0));
|
||||
throw new JedisException("Unknown message type: "+ firstObj);
|
||||
}
|
||||
} while (isSubscribed());
|
||||
}
|
||||
|
||||
@@ -286,18 +286,24 @@ public final class Protocol {
|
||||
BY,
|
||||
DESC,
|
||||
GET,
|
||||
LIMIT,
|
||||
MESSAGE,
|
||||
NO,
|
||||
NOSORT,
|
||||
PMESSAGE,
|
||||
PSUBSCRIBE,
|
||||
PUNSUBSCRIBE,
|
||||
ONE,
|
||||
LIMIT,
|
||||
SET,
|
||||
STORE,
|
||||
SUBSCRIBE,
|
||||
UNSUBSCRIBE,
|
||||
WEIGHTS,
|
||||
WITHSCORES;
|
||||
public final byte[] raw;
|
||||
|
||||
Keyword() {
|
||||
raw = this.name().getBytes(UTF8);
|
||||
raw = this.name().toLowerCase().getBytes(UTF8);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user