From 882d662470351d08d106006821c837d76b5ddaac Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 25 Feb 2014 18:29:09 +0900 Subject: [PATCH] Make Jedis Cluster more likely to antirez's redis-rb-cluster JedisClusterCommand * improvements on connection error handling ** if based on slot connection throws connection related exception, retry to random node ** if we retry with random node, but all nodes are unreachable, throw JedisConnectionException without retry ** try to release connection whether connection is broken or not * bug fix : if asking flag is on, and success this time, set asking flag to off JedisClusterConnectionHandler * have flexibility on initializing slots cache ** allow some nodes connection failure - skip ** if current node is success initializing slots cache, skip other nodes ** if current node failed to initialize slots cache, discard all discovered nodes and slots * set nodes if node does not exist in nodes ** it restricts JedisPool to replace - prevent IllegalStateException : Returned object not currently part of this pool JedisSlotBasedConnectionGuaranteedConnectionHandler * getConnection (random connection) ** check all connections by random sequence ** always return valid connection (able to ping-pong) ** throw exception if all connections are invalid * some refactoring --- .../redis/clients/jedis/JedisCluster.java | 2 +- .../clients/jedis/JedisClusterCommand.java | 87 ++++++++++++++----- .../jedis/JedisClusterConnectionHandler.java | 80 +++++++++++++---- ...ConnectionGuaranteedConnectionHandler.java | 68 +++++++++++++++ 4 files changed, 196 insertions(+), 41 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/JedisSlotBasedConnectionGuaranteedConnectionHandler.java diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index 1f645ea..afa0109 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -27,7 +27,7 @@ public class JedisCluster implements JedisCommands, BasicCommands { public JedisCluster(Set jedisClusterNode, int timeout, int maxRedirections) { - this.connectionHandler = new JedisSlotBasedConnectionHandler( + this.connectionHandler = new JedisSlotBasedConnectionGuaranteedConnectionHandler( jedisClusterNode); this.timeout = timeout; this.maxRedirections = maxRedirections; diff --git a/src/main/java/redis/clients/jedis/JedisClusterCommand.java b/src/main/java/redis/clients/jedis/JedisClusterCommand.java index 6e110bc..c1be912 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterCommand.java +++ b/src/main/java/redis/clients/jedis/JedisClusterCommand.java @@ -3,19 +3,18 @@ package redis.clients.jedis; import redis.clients.jedis.exceptions.JedisAskDataException; import redis.clients.jedis.exceptions.JedisClusterException; import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.exceptions.JedisRedirectionException; import redis.clients.util.JedisClusterCRC16; public abstract class JedisClusterCommand { - private boolean asking = false; - private JedisClusterConnectionHandler connectionHandler; private int commandTimeout; private int redirections; - // private boolean asking = false; - public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int timeout, int maxRedirections) { this.connectionHandler = connectionHandler; @@ -26,40 +25,80 @@ public abstract class JedisClusterCommand { public abstract T execute(Jedis connection); public T run(String key) { + if (key == null) { + throw new JedisClusterException( + "No way to dispatch this command to Redis Cluster."); + } + + return runWithRetries(key, this.redirections, false, false); + } + + private T runWithRetries(String key, int redirections, + boolean tryRandomNode, boolean asking) { + if (redirections <= 0) { + throw new JedisClusterMaxRedirectionsException( + "Too many Cluster redirections?"); + } + Jedis connection = null; try { - - if (key == null) { - throw new JedisClusterException( - "No way to dispatch this command to Redis Cluster."); - } else if (redirections == 0) { - throw new JedisClusterMaxRedirectionsException( - "Too many Cluster redirections?"); + if (tryRandomNode) { + connection = connectionHandler.getConnection(); + } else { + connection = connectionHandler + .getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } - connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16 - .getSlot(key)); + if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection.asking(); + + // if asking success, reset asking flag + asking = false; } + return execute(connection); + } catch (JedisConnectionException jce) { + if (tryRandomNode) { + // maybe all connection is down + throw jce; + } + + releaseConnection(connection, true); + connection = null; + + // retry with random connection + return runWithRetries(key, redirections--, true, asking); } catch (JedisRedirectionException jre) { - return handleRedirection(jre, key); + if (jre instanceof JedisAskDataException) { + asking = true; + } else if (jre instanceof JedisMovedDataException) { + // TODO : In antirez's redis-rb-cluster implementation, + // it rebuilds cluster's slot and node cache + } + + this.connectionHandler.assignSlotToNode(jre.getSlot(), + jre.getTargetNode()); + + releaseConnection(connection, false); + connection = null; + + return runWithRetries(key, redirections - 1, false, asking); } finally { - if (connection != null) { + releaseConnection(connection, false); + } + + } + + private void releaseConnection(Jedis connection, boolean broken) { + if (connection != null) { + if (broken) { + connectionHandler.returnBrokenConnection(connection); + } else { connectionHandler.returnConnection(connection); } } } - private T handleRedirection(JedisRedirectionException jre, String key) { - if (jre instanceof JedisAskDataException) { - asking = true; - } - redirections--; - this.connectionHandler.assignSlotToNode(jre.getSlot(), - jre.getTargetNode()); - return run(key); - } } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java index 28e22f9..94f4c75 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -5,17 +5,22 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import redis.clients.jedis.exceptions.JedisConnectionException; + public abstract class JedisClusterConnectionHandler { protected Map nodes = new HashMap(); protected Map slots = new HashMap(); abstract Jedis getConnection(); - + protected void returnConnection(Jedis connection) { - nodes.get( - connection.getClient().getHost() - + connection.getClient().getPort()).returnResource( + nodes.get(getNodeKey(connection.getClient())) + .returnResource(connection); + } + + public void returnBrokenConnection(Jedis connection) { + nodes.get(getNodeKey(connection.getClient())).returnBrokenResource( connection); } @@ -29,29 +34,57 @@ public abstract class JedisClusterConnectionHandler { return nodes; } - private void initializeSlotsCache(Set nodes) { - for (HostAndPort hostAndPort : nodes) { + private void initializeSlotsCache(Set startNodes) { + for (HostAndPort hostAndPort : startNodes) { JedisPool jp = new JedisPool(hostAndPort.getHost(), hostAndPort.getPort()); - this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp); - Jedis jedis = jp.getResource(); + + this.nodes.clear(); + this.slots.clear(); + + Jedis jedis = null; try { + jedis = jp.getResource(); discoverClusterNodesAndSlots(jedis); + break; + } catch (JedisConnectionException e) { + if (jedis != null) { + jp.returnBrokenResource(jedis); + jedis = null; + } + + // try next nodes } finally { - jp.returnResource(jedis); + if (jedis != null) { + jp.returnResource(jedis); + } } } - } + for (HostAndPort node : startNodes) { + setNodeIfNotExist(node); + } + } + private void discoverClusterNodesAndSlots(Jedis jedis) { String localNodes = jedis.clusterNodes(); for (String nodeInfo : localNodes.split("\n")) { HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis); - JedisPool nodePool = new JedisPool(node.getHost(), node.getPort()); - this.nodes.put(node.getHost() + node.getPort(), nodePool); + setNodeIfNotExist(node); + + JedisPool nodePool = nodes.get(getNodeKey(node)); populateNodeSlots(nodeInfo, nodePool); } } + + private void setNodeIfNotExist(HostAndPort node) { + String nodeKey = getNodeKey(node); + if (nodes.containsKey(nodeKey)) + return; + + JedisPool nodePool = new JedisPool(node.getHost(), node.getPort()); + nodes.put(nodeKey, nodePool); + } private void populateNodeSlots(String nodeInfo, JedisPool nodePool) { String[] nodeInfoArray = nodeInfo.split(" "); @@ -74,7 +107,8 @@ public abstract class JedisClusterConnectionHandler { } } - private HostAndPort getHostAndPortFromNodeLine(String nodeInfo, Jedis currentConnection) { + private HostAndPort getHostAndPortFromNodeLine(String nodeInfo, + Jedis currentConnection) { String stringHostAndPort = nodeInfo.split(" ", 3)[1]; if (":0".equals(stringHostAndPort)) { return new HostAndPort(currentConnection.getClient().getHost(), @@ -86,9 +120,16 @@ public abstract class JedisClusterConnectionHandler { } public void assignSlotToNode(int slot, HostAndPort targetNode) { - JedisPool targetPool = nodes.get(targetNode.getHost() - + targetNode.getPort()); - slots.put(slot, targetPool); + JedisPool targetPool = nodes.get(getNodeKey(targetNode)); + + if (targetPool != null) { + slots.put(slot, targetPool); + } else { + setNodeIfNotExist(targetNode); + + targetPool = nodes.get(getNodeKey(targetNode)); + slots.put(slot, targetPool); + } } protected JedisPool getRandomConnection() { @@ -96,4 +137,11 @@ public abstract class JedisClusterConnectionHandler { return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]); } + protected String getNodeKey(HostAndPort hnp) { + return hnp.getHost() + ":" + hnp.getPort(); + } + + protected String getNodeKey(Client client) { + return client.getHost() + ":" + client.getPort(); + } } diff --git a/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionGuaranteedConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionGuaranteedConnectionHandler.java new file mode 100644 index 0000000..0fe2cec --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionGuaranteedConnectionHandler.java @@ -0,0 +1,68 @@ +package redis.clients.jedis; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import redis.clients.jedis.exceptions.JedisConnectionException; + +public class JedisSlotBasedConnectionGuaranteedConnectionHandler extends + JedisSlotBasedConnectionHandler { + + public JedisSlotBasedConnectionGuaranteedConnectionHandler( + Set nodes) { + super(nodes); + } + + public Jedis getConnection() { + // In antirez's redis-rb-cluster implementation, + // getRandomConnection always return valid connection (able to ping-pong) + // or exception if all connections are invalid + + List pools = getShuffledNodesPool(); + + for (JedisPool pool : pools) { + Jedis jedis = null; + try { + jedis = pool.getResource(); + + if (jedis == null) { + continue; + } + + String result = jedis.ping(); + + if (result.equalsIgnoreCase("pong")) + return jedis; + + pool.returnBrokenResource(jedis); + } catch (JedisConnectionException ex) { + if (jedis != null) { + pool.returnBrokenResource(jedis); + } + } + } + + throw new JedisConnectionException("no reachable node in cluster"); + } + + @Override + public Jedis getConnectionFromSlot(int slot) { + JedisPool connectionPool = slots.get(slot); + if (connectionPool != null) { + // It can't guaranteed to get valid connection because of node assignment + return connectionPool.getResource(); + } else { + return getConnection(); + } + } + + private List getShuffledNodesPool() { + List pools = new ArrayList(); + pools.addAll(nodes.values()); + Collections.shuffle(pools); + return pools; + } + +}