Merge branch 'fix-jedis-cluster-failover-to-slave' of https://github.com/HeartSaVioR/jedis into HeartSaVioR-fix-jedis-cluster-failover-to-slave

This commit is contained in:
Marcos Nils
2014-03-03 18:49:43 -03:00
5 changed files with 200 additions and 41 deletions

View File

@@ -27,7 +27,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout,
int maxRedirections) { int maxRedirections) {
this.connectionHandler = new JedisSlotBasedConnectionHandler( this.connectionHandler = new JedisSlotBasedConnectionGuaranteedConnectionHandler(
jedisClusterNode); jedisClusterNode);
this.timeout = timeout; this.timeout = timeout;
this.maxRedirections = maxRedirections; this.maxRedirections = maxRedirections;

View File

@@ -3,19 +3,18 @@ package redis.clients.jedis;
import redis.clients.jedis.exceptions.JedisAskDataException; import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException; import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException; import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException; import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16; import redis.clients.util.JedisClusterCRC16;
public abstract class JedisClusterCommand<T> { public abstract class JedisClusterCommand<T> {
private boolean asking = false;
private JedisClusterConnectionHandler connectionHandler; private JedisClusterConnectionHandler connectionHandler;
private int commandTimeout; private int commandTimeout;
private int redirections; private int redirections;
// private boolean asking = false;
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler,
int timeout, int maxRedirections) { int timeout, int maxRedirections) {
this.connectionHandler = connectionHandler; this.connectionHandler = connectionHandler;
@@ -26,40 +25,80 @@ public abstract class JedisClusterCommand<T> {
public abstract T execute(Jedis connection); public abstract T execute(Jedis connection);
public T run(String key) { public T run(String key) {
if (key == null) {
throw new JedisClusterException(
"No way to dispatch this command to Redis Cluster.");
}
return runWithRetries(key, this.redirections, false, false);
}
private T runWithRetries(String key, int redirections,
boolean tryRandomNode, boolean asking) {
if (redirections <= 0) {
throw new JedisClusterMaxRedirectionsException(
"Too many Cluster redirections?");
}
Jedis connection = null; Jedis connection = null;
try { try {
if (tryRandomNode) {
if (key == null) { connection = connectionHandler.getConnection();
throw new JedisClusterException( } else {
"No way to dispatch this command to Redis Cluster."); connection = connectionHandler
} else if (redirections == 0) { .getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
throw new JedisClusterMaxRedirectionsException(
"Too many Cluster redirections?");
} }
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16
.getSlot(key));
if (asking) { if (asking) {
// TODO: Pipeline asking with the original command to make it // TODO: Pipeline asking with the original command to make it
// faster.... // faster....
connection.asking(); connection.asking();
// if asking success, reset asking flag
asking = false;
} }
return execute(connection); return execute(connection);
} catch (JedisConnectionException jce) {
if (tryRandomNode) {
// maybe all connection is down
throw jce;
}
releaseConnection(connection, true);
connection = null;
// retry with random connection
return runWithRetries(key, redirections--, true, asking);
} catch (JedisRedirectionException jre) { } catch (JedisRedirectionException jre) {
return handleRedirection(jre, key); if (jre instanceof JedisAskDataException) {
asking = true;
} else if (jre instanceof JedisMovedDataException) {
// TODO : In antirez's redis-rb-cluster implementation,
// it rebuilds cluster's slot and node cache
}
this.connectionHandler.assignSlotToNode(jre.getSlot(),
jre.getTargetNode());
releaseConnection(connection, false);
connection = null;
return runWithRetries(key, redirections - 1, false, asking);
} finally { } finally {
if (connection != null) { releaseConnection(connection, false);
}
}
private void releaseConnection(Jedis connection, boolean broken) {
if (connection != null) {
if (broken) {
connectionHandler.returnBrokenConnection(connection);
} else {
connectionHandler.returnConnection(connection); connectionHandler.returnConnection(connection);
} }
} }
} }
private T handleRedirection(JedisRedirectionException jre, String key) {
if (jre instanceof JedisAskDataException) {
asking = true;
}
redirections--;
this.connectionHandler.assignSlotToNode(jre.getSlot(),
jre.getTargetNode());
return run(key);
}
} }

View File

@@ -5,17 +5,22 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import redis.clients.jedis.exceptions.JedisConnectionException;
public abstract class JedisClusterConnectionHandler { public abstract class JedisClusterConnectionHandler {
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>(); protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>(); protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
abstract Jedis getConnection(); abstract Jedis getConnection();
protected void returnConnection(Jedis connection) { protected void returnConnection(Jedis connection) {
nodes.get( nodes.get(getNodeKey(connection.getClient()))
connection.getClient().getHost() .returnResource(connection);
+ connection.getClient().getPort()).returnResource( }
public void returnBrokenConnection(Jedis connection) {
nodes.get(getNodeKey(connection.getClient())).returnBrokenResource(
connection); connection);
} }
@@ -29,29 +34,57 @@ public abstract class JedisClusterConnectionHandler {
return nodes; return nodes;
} }
private void initializeSlotsCache(Set<HostAndPort> nodes) { private void initializeSlotsCache(Set<HostAndPort> startNodes) {
for (HostAndPort hostAndPort : nodes) { for (HostAndPort hostAndPort : startNodes) {
JedisPool jp = new JedisPool(hostAndPort.getHost(), JedisPool jp = new JedisPool(hostAndPort.getHost(),
hostAndPort.getPort()); hostAndPort.getPort());
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
Jedis jedis = jp.getResource(); this.nodes.clear();
this.slots.clear();
Jedis jedis = null;
try { try {
jedis = jp.getResource();
discoverClusterNodesAndSlots(jedis); discoverClusterNodesAndSlots(jedis);
break;
} catch (JedisConnectionException e) {
if (jedis != null) {
jp.returnBrokenResource(jedis);
jedis = null;
}
// try next nodes
} finally { } finally {
jp.returnResource(jedis); if (jedis != null) {
jp.returnResource(jedis);
}
} }
} }
}
for (HostAndPort node : startNodes) {
setNodeIfNotExist(node);
}
}
private void discoverClusterNodesAndSlots(Jedis jedis) { private void discoverClusterNodesAndSlots(Jedis jedis) {
String localNodes = jedis.clusterNodes(); String localNodes = jedis.clusterNodes();
for (String nodeInfo : localNodes.split("\n")) { for (String nodeInfo : localNodes.split("\n")) {
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis); HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis);
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort()); setNodeIfNotExist(node);
this.nodes.put(node.getHost() + node.getPort(), nodePool);
JedisPool nodePool = nodes.get(getNodeKey(node));
populateNodeSlots(nodeInfo, nodePool); populateNodeSlots(nodeInfo, nodePool);
} }
} }
private void setNodeIfNotExist(HostAndPort node) {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey))
return;
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
nodes.put(nodeKey, nodePool);
}
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) { private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
String[] nodeInfoArray = nodeInfo.split(" "); String[] nodeInfoArray = nodeInfo.split(" ");
@@ -74,7 +107,8 @@ public abstract class JedisClusterConnectionHandler {
} }
} }
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo, Jedis currentConnection) { private HostAndPort getHostAndPortFromNodeLine(String nodeInfo,
Jedis currentConnection) {
String stringHostAndPort = nodeInfo.split(" ", 3)[1]; String stringHostAndPort = nodeInfo.split(" ", 3)[1];
if (":0".equals(stringHostAndPort)) { if (":0".equals(stringHostAndPort)) {
return new HostAndPort(currentConnection.getClient().getHost(), return new HostAndPort(currentConnection.getClient().getHost(),
@@ -86,9 +120,16 @@ public abstract class JedisClusterConnectionHandler {
} }
public void assignSlotToNode(int slot, HostAndPort targetNode) { public void assignSlotToNode(int slot, HostAndPort targetNode) {
JedisPool targetPool = nodes.get(targetNode.getHost() JedisPool targetPool = nodes.get(getNodeKey(targetNode));
+ targetNode.getPort());
slots.put(slot, targetPool); if (targetPool != null) {
slots.put(slot, targetPool);
} else {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
slots.put(slot, targetPool);
}
} }
protected JedisPool getRandomConnection() { protected JedisPool getRandomConnection() {
@@ -96,4 +137,11 @@ public abstract class JedisClusterConnectionHandler {
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]); return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
} }
protected String getNodeKey(HostAndPort hnp) {
return hnp.getHost() + ":" + hnp.getPort();
}
protected String getNodeKey(Client client) {
return client.getHost() + ":" + client.getPort();
}
} }

View File

@@ -0,0 +1,68 @@
package redis.clients.jedis;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import redis.clients.jedis.exceptions.JedisConnectionException;
public class JedisSlotBasedConnectionGuaranteedConnectionHandler extends
JedisSlotBasedConnectionHandler {
public JedisSlotBasedConnectionGuaranteedConnectionHandler(
Set<HostAndPort> nodes) {
super(nodes);
}
public Jedis getConnection() {
// In antirez's redis-rb-cluster implementation,
// getRandomConnection always return valid connection (able to ping-pong)
// or exception if all connections are invalid
List<JedisPool> pools = getShuffledNodesPool();
for (JedisPool pool : pools) {
Jedis jedis = null;
try {
jedis = pool.getResource();
if (jedis == null) {
continue;
}
String result = jedis.ping();
if (result.equalsIgnoreCase("pong"))
return jedis;
pool.returnBrokenResource(jedis);
} catch (JedisConnectionException ex) {
if (jedis != null) {
pool.returnBrokenResource(jedis);
}
}
}
throw new JedisConnectionException("no reachable node in cluster");
}
@Override
public Jedis getConnectionFromSlot(int slot) {
JedisPool connectionPool = slots.get(slot);
if (connectionPool != null) {
// It can't guaranteed to get valid connection because of node assignment
return connectionPool.getResource();
} else {
return getConnection();
}
}
private List<JedisPool> getShuffledNodesPool() {
List<JedisPool> pools = new ArrayList<JedisPool>();
pools.addAll(nodes.values());
Collections.shuffle(pools);
return pools;
}
}

View File

@@ -5,6 +5,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import redis.clients.jedis.exceptions.JedisAskDataException; import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.exceptions.JedisMovedDataException;
@@ -16,6 +17,7 @@ public final class Protocol {
private static final String ASK_RESPONSE = "ASK"; private static final String ASK_RESPONSE = "ASK";
private static final String MOVED_RESPONSE = "MOVED"; private static final String MOVED_RESPONSE = "MOVED";
private static final String CLUSTERDOWN_RESPONSE = "CLUSTERDOWN";
public static final int DEFAULT_PORT = 6379; public static final int DEFAULT_PORT = 6379;
public static final int DEFAULT_SENTINEL_PORT = 26379; public static final int DEFAULT_SENTINEL_PORT = 26379;
public static final int DEFAULT_TIMEOUT = 2000; public static final int DEFAULT_TIMEOUT = 2000;
@@ -96,6 +98,8 @@ public final class Protocol {
throw new JedisAskDataException(message, new HostAndPort( throw new JedisAskDataException(message, new HostAndPort(
askInfo[1], Integer.valueOf(askInfo[2])), askInfo[1], Integer.valueOf(askInfo[2])),
Integer.valueOf(askInfo[0])); Integer.valueOf(askInfo[0]));
} else if (message.startsWith(CLUSTERDOWN_RESPONSE)) {
throw new JedisClusterException(message);
} }
throw new JedisDataException(message); throw new JedisDataException(message);
} }