Make Jedis Cluster more likely to antirez's redis-rb-cluster
JedisClusterCommand * improvements on connection error handling ** if based on slot connection throws connection related exception, retry to random node ** if we retry with random node, but all nodes are unreachable, throw JedisConnectionException without retry ** try to release connection whether connection is broken or not * bug fix : if asking flag is on, and success this time, set asking flag to off JedisClusterConnectionHandler * have flexibility on initializing slots cache ** allow some nodes connection failure - skip ** if current node is success initializing slots cache, skip other nodes ** if current node failed to initialize slots cache, discard all discovered nodes and slots * set nodes if node does not exist in nodes ** it restricts JedisPool to replace - prevent IllegalStateException : Returned object not currently part of this pool JedisSlotBasedConnectionGuaranteedConnectionHandler * getConnection (random connection) ** check all connections by random sequence ** always return valid connection (able to ping-pong) ** throw exception if all connections are invalid * some refactoring
This commit is contained in:
@@ -27,7 +27,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
||||
|
||||
public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout,
|
||||
int maxRedirections) {
|
||||
this.connectionHandler = new JedisSlotBasedConnectionHandler(
|
||||
this.connectionHandler = new JedisSlotBasedConnectionGuaranteedConnectionHandler(
|
||||
jedisClusterNode);
|
||||
this.timeout = timeout;
|
||||
this.maxRedirections = maxRedirections;
|
||||
|
||||
@@ -3,19 +3,18 @@ package redis.clients.jedis;
|
||||
import redis.clients.jedis.exceptions.JedisAskDataException;
|
||||
import redis.clients.jedis.exceptions.JedisClusterException;
|
||||
import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException;
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.jedis.exceptions.JedisException;
|
||||
import redis.clients.jedis.exceptions.JedisMovedDataException;
|
||||
import redis.clients.jedis.exceptions.JedisRedirectionException;
|
||||
import redis.clients.util.JedisClusterCRC16;
|
||||
|
||||
public abstract class JedisClusterCommand<T> {
|
||||
|
||||
private boolean asking = false;
|
||||
|
||||
private JedisClusterConnectionHandler connectionHandler;
|
||||
private int commandTimeout;
|
||||
private int redirections;
|
||||
|
||||
// private boolean asking = false;
|
||||
|
||||
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler,
|
||||
int timeout, int maxRedirections) {
|
||||
this.connectionHandler = connectionHandler;
|
||||
@@ -26,40 +25,80 @@ public abstract class JedisClusterCommand<T> {
|
||||
public abstract T execute(Jedis connection);
|
||||
|
||||
public T run(String key) {
|
||||
if (key == null) {
|
||||
throw new JedisClusterException(
|
||||
"No way to dispatch this command to Redis Cluster.");
|
||||
}
|
||||
|
||||
return runWithRetries(key, this.redirections, false, false);
|
||||
}
|
||||
|
||||
private T runWithRetries(String key, int redirections,
|
||||
boolean tryRandomNode, boolean asking) {
|
||||
if (redirections <= 0) {
|
||||
throw new JedisClusterMaxRedirectionsException(
|
||||
"Too many Cluster redirections?");
|
||||
}
|
||||
|
||||
Jedis connection = null;
|
||||
try {
|
||||
|
||||
if (key == null) {
|
||||
throw new JedisClusterException(
|
||||
"No way to dispatch this command to Redis Cluster.");
|
||||
} else if (redirections == 0) {
|
||||
throw new JedisClusterMaxRedirectionsException(
|
||||
"Too many Cluster redirections?");
|
||||
if (tryRandomNode) {
|
||||
connection = connectionHandler.getConnection();
|
||||
} else {
|
||||
connection = connectionHandler
|
||||
.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
|
||||
}
|
||||
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16
|
||||
.getSlot(key));
|
||||
|
||||
if (asking) {
|
||||
// TODO: Pipeline asking with the original command to make it
|
||||
// faster....
|
||||
connection.asking();
|
||||
|
||||
// if asking success, reset asking flag
|
||||
asking = false;
|
||||
}
|
||||
|
||||
return execute(connection);
|
||||
} catch (JedisConnectionException jce) {
|
||||
if (tryRandomNode) {
|
||||
// maybe all connection is down
|
||||
throw jce;
|
||||
}
|
||||
|
||||
releaseConnection(connection, true);
|
||||
connection = null;
|
||||
|
||||
// retry with random connection
|
||||
return runWithRetries(key, redirections--, true, asking);
|
||||
} catch (JedisRedirectionException jre) {
|
||||
return handleRedirection(jre, key);
|
||||
if (jre instanceof JedisAskDataException) {
|
||||
asking = true;
|
||||
} else if (jre instanceof JedisMovedDataException) {
|
||||
// TODO : In antirez's redis-rb-cluster implementation,
|
||||
// it rebuilds cluster's slot and node cache
|
||||
}
|
||||
|
||||
this.connectionHandler.assignSlotToNode(jre.getSlot(),
|
||||
jre.getTargetNode());
|
||||
|
||||
releaseConnection(connection, false);
|
||||
connection = null;
|
||||
|
||||
return runWithRetries(key, redirections - 1, false, asking);
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
releaseConnection(connection, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void releaseConnection(Jedis connection, boolean broken) {
|
||||
if (connection != null) {
|
||||
if (broken) {
|
||||
connectionHandler.returnBrokenConnection(connection);
|
||||
} else {
|
||||
connectionHandler.returnConnection(connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private T handleRedirection(JedisRedirectionException jre, String key) {
|
||||
if (jre instanceof JedisAskDataException) {
|
||||
asking = true;
|
||||
}
|
||||
redirections--;
|
||||
this.connectionHandler.assignSlotToNode(jre.getSlot(),
|
||||
jre.getTargetNode());
|
||||
return run(key);
|
||||
}
|
||||
}
|
||||
@@ -5,17 +5,22 @@ import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
|
||||
public abstract class JedisClusterConnectionHandler {
|
||||
|
||||
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
|
||||
|
||||
abstract Jedis getConnection();
|
||||
|
||||
|
||||
protected void returnConnection(Jedis connection) {
|
||||
nodes.get(
|
||||
connection.getClient().getHost()
|
||||
+ connection.getClient().getPort()).returnResource(
|
||||
nodes.get(getNodeKey(connection.getClient()))
|
||||
.returnResource(connection);
|
||||
}
|
||||
|
||||
public void returnBrokenConnection(Jedis connection) {
|
||||
nodes.get(getNodeKey(connection.getClient())).returnBrokenResource(
|
||||
connection);
|
||||
}
|
||||
|
||||
@@ -29,29 +34,57 @@ public abstract class JedisClusterConnectionHandler {
|
||||
return nodes;
|
||||
}
|
||||
|
||||
private void initializeSlotsCache(Set<HostAndPort> nodes) {
|
||||
for (HostAndPort hostAndPort : nodes) {
|
||||
private void initializeSlotsCache(Set<HostAndPort> startNodes) {
|
||||
for (HostAndPort hostAndPort : startNodes) {
|
||||
JedisPool jp = new JedisPool(hostAndPort.getHost(),
|
||||
hostAndPort.getPort());
|
||||
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
|
||||
Jedis jedis = jp.getResource();
|
||||
|
||||
this.nodes.clear();
|
||||
this.slots.clear();
|
||||
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
jedis = jp.getResource();
|
||||
discoverClusterNodesAndSlots(jedis);
|
||||
break;
|
||||
} catch (JedisConnectionException e) {
|
||||
if (jedis != null) {
|
||||
jp.returnBrokenResource(jedis);
|
||||
jedis = null;
|
||||
}
|
||||
|
||||
// try next nodes
|
||||
} finally {
|
||||
jp.returnResource(jedis);
|
||||
if (jedis != null) {
|
||||
jp.returnResource(jedis);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (HostAndPort node : startNodes) {
|
||||
setNodeIfNotExist(node);
|
||||
}
|
||||
}
|
||||
|
||||
private void discoverClusterNodesAndSlots(Jedis jedis) {
|
||||
String localNodes = jedis.clusterNodes();
|
||||
for (String nodeInfo : localNodes.split("\n")) {
|
||||
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis);
|
||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||
this.nodes.put(node.getHost() + node.getPort(), nodePool);
|
||||
setNodeIfNotExist(node);
|
||||
|
||||
JedisPool nodePool = nodes.get(getNodeKey(node));
|
||||
populateNodeSlots(nodeInfo, nodePool);
|
||||
}
|
||||
}
|
||||
|
||||
private void setNodeIfNotExist(HostAndPort node) {
|
||||
String nodeKey = getNodeKey(node);
|
||||
if (nodes.containsKey(nodeKey))
|
||||
return;
|
||||
|
||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||
nodes.put(nodeKey, nodePool);
|
||||
}
|
||||
|
||||
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
|
||||
String[] nodeInfoArray = nodeInfo.split(" ");
|
||||
@@ -74,7 +107,8 @@ public abstract class JedisClusterConnectionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo, Jedis currentConnection) {
|
||||
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo,
|
||||
Jedis currentConnection) {
|
||||
String stringHostAndPort = nodeInfo.split(" ", 3)[1];
|
||||
if (":0".equals(stringHostAndPort)) {
|
||||
return new HostAndPort(currentConnection.getClient().getHost(),
|
||||
@@ -86,9 +120,16 @@ public abstract class JedisClusterConnectionHandler {
|
||||
}
|
||||
|
||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||
JedisPool targetPool = nodes.get(targetNode.getHost()
|
||||
+ targetNode.getPort());
|
||||
slots.put(slot, targetPool);
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
|
||||
if (targetPool != null) {
|
||||
slots.put(slot, targetPool);
|
||||
} else {
|
||||
setNodeIfNotExist(targetNode);
|
||||
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
slots.put(slot, targetPool);
|
||||
}
|
||||
}
|
||||
|
||||
protected JedisPool getRandomConnection() {
|
||||
@@ -96,4 +137,11 @@ public abstract class JedisClusterConnectionHandler {
|
||||
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
|
||||
}
|
||||
|
||||
protected String getNodeKey(HostAndPort hnp) {
|
||||
return hnp.getHost() + ":" + hnp.getPort();
|
||||
}
|
||||
|
||||
protected String getNodeKey(Client client) {
|
||||
return client.getHost() + ":" + client.getPort();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
|
||||
public class JedisSlotBasedConnectionGuaranteedConnectionHandler extends
|
||||
JedisSlotBasedConnectionHandler {
|
||||
|
||||
public JedisSlotBasedConnectionGuaranteedConnectionHandler(
|
||||
Set<HostAndPort> nodes) {
|
||||
super(nodes);
|
||||
}
|
||||
|
||||
public Jedis getConnection() {
|
||||
// In antirez's redis-rb-cluster implementation,
|
||||
// getRandomConnection always return valid connection (able to ping-pong)
|
||||
// or exception if all connections are invalid
|
||||
|
||||
List<JedisPool> pools = getShuffledNodesPool();
|
||||
|
||||
for (JedisPool pool : pools) {
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
jedis = pool.getResource();
|
||||
|
||||
if (jedis == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String result = jedis.ping();
|
||||
|
||||
if (result.equalsIgnoreCase("pong"))
|
||||
return jedis;
|
||||
|
||||
pool.returnBrokenResource(jedis);
|
||||
} catch (JedisConnectionException ex) {
|
||||
if (jedis != null) {
|
||||
pool.returnBrokenResource(jedis);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new JedisConnectionException("no reachable node in cluster");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Jedis getConnectionFromSlot(int slot) {
|
||||
JedisPool connectionPool = slots.get(slot);
|
||||
if (connectionPool != null) {
|
||||
// It can't guaranteed to get valid connection because of node assignment
|
||||
return connectionPool.getResource();
|
||||
} else {
|
||||
return getConnection();
|
||||
}
|
||||
}
|
||||
|
||||
private List<JedisPool> getShuffledNodesPool() {
|
||||
List<JedisPool> pools = new ArrayList<JedisPool>();
|
||||
pools.addAll(nodes.values());
|
||||
Collections.shuffle(pools);
|
||||
return pools;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user