Refactor redis cluster connection handler and add JedisAskDataException handling
This commit is contained in:
@@ -15,7 +15,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
private JedisClusterConnectionHandler connectionHandler;
|
private JedisClusterConnectionHandler connectionHandler;
|
||||||
|
|
||||||
public JedisCluster(Set<HostAndPort> nodes, int timeout) {
|
public JedisCluster(Set<HostAndPort> nodes, int timeout) {
|
||||||
connectionHandler = new JedisClusterConnectionHandler(nodes);
|
connectionHandler = new JedisRandomConnectionHandler(nodes);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -31,7 +31,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<String>(connectionHandler) {
|
return new JedisClusterCommand<String>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public String execute() {
|
public String execute() {
|
||||||
return connectionHandler.getRandomConnection().set(key, value);
|
return connectionHandler.getConnection().set(key, value);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -41,7 +41,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<String>(connectionHandler) {
|
return new JedisClusterCommand<String>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public String execute() {
|
public String execute() {
|
||||||
return connectionHandler.getRandomConnection().get(key);
|
return connectionHandler.getConnection().get(key);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -51,7 +51,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public Boolean execute() {
|
public Boolean execute() {
|
||||||
return connectionHandler.getRandomConnection().exists(key);
|
return connectionHandler.getConnection().exists(key);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -61,7 +61,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<Long>(connectionHandler) {
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public Long execute() {
|
public Long execute() {
|
||||||
return connectionHandler.getRandomConnection().persist(key);
|
return connectionHandler.getConnection().persist(key);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -71,7 +71,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<String>(connectionHandler) {
|
return new JedisClusterCommand<String>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public String execute() {
|
public String execute() {
|
||||||
return connectionHandler.getRandomConnection().type(key);
|
return connectionHandler.getConnection().type(key);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -81,7 +81,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<Long>(connectionHandler) {
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public Long execute() {
|
public Long execute() {
|
||||||
return connectionHandler.getRandomConnection().expire(key, seconds);
|
return connectionHandler.getConnection().expire(key, seconds);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -91,7 +91,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<Long>(connectionHandler) {
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public Long execute() {
|
public Long execute() {
|
||||||
return connectionHandler.getRandomConnection().expireAt(key, unixTime);
|
return connectionHandler.getConnection().expireAt(key, unixTime);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -101,7 +101,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<Long>(connectionHandler) {
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public Long execute() {
|
public Long execute() {
|
||||||
return connectionHandler.getRandomConnection().ttl(key);
|
return connectionHandler.getConnection().ttl(key);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -111,7 +111,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public Boolean execute() {
|
public Boolean execute() {
|
||||||
return connectionHandler.getRandomConnection().setbit(key, offset, value);
|
return connectionHandler.getConnection().setbit(key, offset, value);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -121,7 +121,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public Boolean execute() {
|
public Boolean execute() {
|
||||||
return connectionHandler.getRandomConnection().setbit(key, offset, value);
|
return connectionHandler.getConnection().setbit(key, offset, value);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -131,7 +131,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public Boolean execute() {
|
public Boolean execute() {
|
||||||
return connectionHandler.getRandomConnection().getbit(key, offset);
|
return connectionHandler.getConnection().getbit(key, offset);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -141,7 +141,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<Long>(connectionHandler) {
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public Long execute() {
|
public Long execute() {
|
||||||
return connectionHandler.getRandomConnection().setrange(key, offset, value);
|
return connectionHandler.getConnection().setrange(key, offset, value);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
@@ -151,7 +151,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
return new JedisClusterCommand<String>(connectionHandler) {
|
return new JedisClusterCommand<String>(connectionHandler) {
|
||||||
@Override
|
@Override
|
||||||
public String execute() {
|
public String execute() {
|
||||||
return connectionHandler.getRandomConnection().getrange(key, startOffset, endOffset);
|
return connectionHandler.getConnection().getrange(key, startOffset, endOffset);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,33 +1,8 @@
|
|||||||
package redis.clients.jedis;
|
package redis.clients.jedis;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import redis.clients.util.Pool;
|
public interface JedisClusterConnectionHandler {
|
||||||
|
|
||||||
public class JedisClusterConnectionHandler {
|
|
||||||
|
|
||||||
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
|
||||||
|
|
||||||
public JedisClusterConnectionHandler(Set<HostAndPort> nodes) {
|
Jedis getConnection();
|
||||||
initializeSlotsCache(nodes);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initializeSlotsCache(Set<HostAndPort> nodes) {
|
|
||||||
for (HostAndPort hostAndPort : nodes) {
|
|
||||||
JedisPool jp = new JedisPool(hostAndPort.getHost(),
|
|
||||||
hostAndPort.getPort());
|
|
||||||
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public Jedis getRandomConnection() {
|
|
||||||
Object[] nodeArray = nodes.values().toArray();
|
|
||||||
return ((Pool<Jedis>) nodeArray[new Random().nextInt(nodeArray.length)]).getResource();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,33 @@
|
|||||||
|
package redis.clients.jedis;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import redis.clients.util.Pool;
|
||||||
|
|
||||||
|
public class JedisRandomConnectionHandler implements JedisClusterConnectionHandler {
|
||||||
|
|
||||||
|
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||||
|
|
||||||
|
public JedisRandomConnectionHandler(Set<HostAndPort> nodes) {
|
||||||
|
initializeSlotsCache(nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeSlotsCache(Set<HostAndPort> nodes) {
|
||||||
|
for (HostAndPort hostAndPort : nodes) {
|
||||||
|
JedisPool jp = new JedisPool(hostAndPort.getHost(),
|
||||||
|
hostAndPort.getPort());
|
||||||
|
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public Jedis getConnection() {
|
||||||
|
Object[] nodeArray = nodes.values().toArray();
|
||||||
|
return ((Pool<Jedis>) nodeArray[new Random().nextInt(nodeArray.length)]).getResource();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import java.io.IOException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import redis.clients.jedis.exceptions.JedisAskDataException;
|
||||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||||
import redis.clients.jedis.exceptions.JedisDataException;
|
import redis.clients.jedis.exceptions.JedisDataException;
|
||||||
import redis.clients.jedis.exceptions.JedisMovedDataException;
|
import redis.clients.jedis.exceptions.JedisMovedDataException;
|
||||||
@@ -13,6 +14,7 @@ import redis.clients.util.SafeEncoder;
|
|||||||
|
|
||||||
public final class Protocol {
|
public final class Protocol {
|
||||||
|
|
||||||
|
private static final String ASK_RESPONSE = "ASK";
|
||||||
private static final String MOVED_RESPONSE = "MOVED";
|
private static final String MOVED_RESPONSE = "MOVED";
|
||||||
public static final int DEFAULT_PORT = 6379;
|
public static final int DEFAULT_PORT = 6379;
|
||||||
public static final int DEFAULT_SENTINEL_PORT = 26379;
|
public static final int DEFAULT_SENTINEL_PORT = 26379;
|
||||||
@@ -75,9 +77,12 @@ public final class Protocol {
|
|||||||
|
|
||||||
private static void processError(final RedisInputStream is) {
|
private static void processError(final RedisInputStream is) {
|
||||||
String message = is.readLine();
|
String message = is.readLine();
|
||||||
//TODO: Read only first 5 bytes?
|
//TODO: I'm not sure if this is the best way to do this.
|
||||||
|
//Maybe Read only first 5 bytes instead?
|
||||||
if (message.contains(MOVED_RESPONSE)) {
|
if (message.contains(MOVED_RESPONSE)) {
|
||||||
throw new JedisMovedDataException(message);
|
throw new JedisMovedDataException(message);
|
||||||
|
} else if (message.contains(ASK_RESPONSE)) {
|
||||||
|
throw new JedisAskDataException(message);
|
||||||
}
|
}
|
||||||
throw new JedisDataException(message);
|
throw new JedisDataException(message);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -77,14 +77,13 @@ public class JedisClusterTest extends Assert {
|
|||||||
node2.get("foo");
|
node2.get("foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Test(expected=JedisAskDataException.class)
|
@Test(expected=JedisAskDataException.class)
|
||||||
// public void ask() {
|
public void ask() {
|
||||||
// node1.set("foo", "bar");
|
int keySlot = RedisSlot.getSlot("test");
|
||||||
// int keySlot = RedisSlot.getSlot("foo");
|
String node2Id = getNodeId(node2.clusterNodes());
|
||||||
// String node2Id = getNodeId(node2.clusterNodes());
|
node1.clusterSetSlotMigrating(keySlot, node2Id);
|
||||||
// node1.clusterSetSlotMigrating(keySlot, node2Id);
|
node1.get("test");
|
||||||
// node1.get("foo");
|
}
|
||||||
// }
|
|
||||||
|
|
||||||
private String getNodeId(String infoOutput) {
|
private String getNodeId(String infoOutput) {
|
||||||
for (String infoLine : infoOutput.split("\n")) {
|
for (String infoLine : infoOutput.split("\n")) {
|
||||||
|
|||||||
@@ -22,4 +22,8 @@ public class RedisSlot {
|
|||||||
return crc &= 0xffff % 16384;
|
return crc &= 0xffff % 16384;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
System.out.println(getSlot("test"));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user