diff --git a/src/main/java/redis/clients/jedis/JedisClusterCommand.java b/src/main/java/redis/clients/jedis/JedisClusterCommand.java index 051d5cd..c4d80fa 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterCommand.java +++ b/src/main/java/redis/clients/jedis/JedisClusterCommand.java @@ -72,13 +72,15 @@ public abstract class JedisClusterCommand { } catch (JedisRedirectionException jre) { if (jre instanceof JedisAskDataException) { asking = true; - } else if (jre instanceof JedisMovedDataException) { - // TODO : In antirez's redis-rb-cluster implementation, - // it rebuilds cluster's slot and node cache - } - - this.connectionHandler.assignSlotToNode(jre.getSlot(), - jre.getTargetNode()); + this.connectionHandler.assignSlotToNode(jre.getSlot(), + jre.getTargetNode()); + } else if (jre instanceof JedisMovedDataException) { + // it rebuilds cluster's slot cache + // recommended by Redis cluster specification + this.connectionHandler.renewSlotCache(); + } else { + throw new JedisClusterException(jre); + } releaseConnection(connection, false); connection = null; diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java index e6eb01f..102921e 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -1,27 +1,26 @@ package redis.clients.jedis; -import java.util.*; - import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.util.ClusterNodeInformation; -import redis.clients.util.ClusterNodeInformationParser; + +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey; public abstract class JedisClusterConnectionHandler { - public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser(); - - protected Map nodes = new HashMap(); - protected Map slots = new HashMap(); + protected JedisClusterInfoCache cache = new JedisClusterInfoCache(); abstract Jedis getConnection(); - protected void returnConnection(Jedis connection) { - nodes.get(getNodeKey(connection.getClient())) - .returnResource(connection); + public void returnConnection(Jedis connection) { + cache.getNode(getNodeKey(connection.getClient())) + .returnResource(connection); } public void returnBrokenConnection(Jedis connection) { - nodes.get(getNodeKey(connection.getClient())).returnBrokenResource( - connection); + cache.getNode(getNodeKey(connection.getClient())) + .returnBrokenResource(connection); } abstract Jedis getConnectionFromSlot(int slot); @@ -31,7 +30,11 @@ public abstract class JedisClusterConnectionHandler { } public Map getNodes() { - return nodes; + return cache.getNodes(); + } + + public void assignSlotToNode(int slot, HostAndPort targetNode) { + cache.assignSlotToNode(slot, targetNode); } private void initializeSlotsCache(Set startNodes) { @@ -39,89 +42,43 @@ public abstract class JedisClusterConnectionHandler { JedisPool jp = new JedisPool(hostAndPort.getHost(), hostAndPort.getPort()); - this.nodes.clear(); - this.slots.clear(); - Jedis jedis = null; try { jedis = jp.getResource(); - discoverClusterNodesAndSlots(jedis); + cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { - if (jedis != null) { - jp.returnBrokenResource(jedis); - jedis = null; - } - // try next nodes } finally { - if (jedis != null) { - jp.returnResource(jedis); - } + if (jedis != null) { + jedis.close(); + } } } for (HostAndPort node : startNodes) { - setNodeIfNotExist(node); + cache.setNodeIfNotExist(node); } } - private void discoverClusterNodesAndSlots(Jedis jedis) { - String localNodes = jedis.clusterNodes(); - for (String nodeInfo : localNodes.split("\n")) { - ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse( - nodeInfo, new HostAndPort(jedis.getClient().getHost(), - jedis.getClient().getPort())); - - HostAndPort targetNode = clusterNodeInfo.getNode(); - setNodeIfNotExist(targetNode); - assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode); - } - } - - public void assignSlotToNode(int slot, HostAndPort targetNode) { - JedisPool targetPool = nodes.get(getNodeKey(targetNode)); - - if (targetPool == null) { - setNodeIfNotExist(targetNode); - targetPool = nodes.get(getNodeKey(targetNode)); - } - slots.put(slot, targetPool); - } - - public void assignSlotsToNode(List targetSlots, - HostAndPort targetNode) { - JedisPool targetPool = nodes.get(getNodeKey(targetNode)); - - if (targetPool == null) { - setNodeIfNotExist(targetNode); - targetPool = nodes.get(getNodeKey(targetNode)); - } - - for (Integer slot : targetSlots) { - slots.put(slot, targetPool); - } + public void renewSlotCache() { + for (JedisPool jp : cache.getNodes().values()) { + Jedis jedis = null; + try { + jedis = jp.getResource(); + cache.discoverClusterSlots(jedis); + break; + } finally { + if (jedis != null) { + jedis.close(); + } + } + } } protected JedisPool getRandomConnection() { - Object[] nodeArray = nodes.values().toArray(); + Object[] nodeArray = cache.getNodes().values().toArray(); return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]); } - protected String getNodeKey(HostAndPort hnp) { - return hnp.getHost() + ":" + hnp.getPort(); - } - - protected String getNodeKey(Client client) { - return client.getHost() + ":" + client.getPort(); - } - - private void setNodeIfNotExist(HostAndPort node) { - String nodeKey = getNodeKey(node); - if (nodes.containsKey(nodeKey)) - return; - - JedisPool nodePool = new JedisPool(node.getHost(), node.getPort()); - nodes.put(nodeKey, nodePool); - } } diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java new file mode 100644 index 0000000..7c1f491 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -0,0 +1,127 @@ +package redis.clients.jedis; + +import redis.clients.util.ClusterNodeInformation; +import redis.clients.util.ClusterNodeInformationParser; +import redis.clients.util.SafeEncoder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class JedisClusterInfoCache { + public static final ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser(); + + private Map nodes = new HashMap(); + private Map slots = new HashMap(); + + public synchronized void discoverClusterNodesAndSlots(Jedis jedis) { + this.nodes.clear(); + this.slots.clear(); + + String localNodes = jedis.clusterNodes(); + for (String nodeInfo : localNodes.split("\n")) { + ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse( + nodeInfo, new HostAndPort(jedis.getClient().getHost(), + jedis.getClient().getPort())); + + HostAndPort targetNode = clusterNodeInfo.getNode(); + setNodeIfNotExist(targetNode); + assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode); + } + } + + public synchronized void discoverClusterSlots(Jedis jedis) { + this.slots.clear(); + + List slots = jedis.clusterSlots(); + + for (Object slotInfoObj : slots) { + List slotInfo = (List) slotInfoObj; + + if (slotInfo.size() <= 2) { + continue; + } + + // assigned slots + List slotNums = new ArrayList(); + for (int slot = ((Long) slotInfo.get(0)).intValue() ; + slot <= ((Long) slotInfo.get(1)).intValue() ; + slot++) { + slotNums.add(slot); + } + + // hostInfos + List hostInfos = (List) slotInfo.get(2); + if (hostInfos.size() <= 0) { + continue; + } + + // at this time, we just use master, discard slave information + HostAndPort targetNode = new HostAndPort( + SafeEncoder.encode((byte[]) hostInfos.get(0)), + ((Long) hostInfos.get(1)).intValue()); + + setNodeIfNotExist(targetNode); + assignSlotsToNode(slotNums, targetNode); + } + } + + public synchronized void setNodeIfNotExist(HostAndPort node) { + String nodeKey = getNodeKey(node); + if (nodes.containsKey(nodeKey)) + return; + + JedisPool nodePool = new JedisPool(node.getHost(), node.getPort()); + nodes.put(nodeKey, nodePool); + } + + public synchronized void assignSlotToNode(int slot, HostAndPort targetNode) { + JedisPool targetPool = nodes.get(getNodeKey(targetNode)); + + if (targetPool == null) { + setNodeIfNotExist(targetNode); + targetPool = nodes.get(getNodeKey(targetNode)); + } + slots.put(slot, targetPool); + } + + public synchronized void assignSlotsToNode(List targetSlots, + HostAndPort targetNode) { + JedisPool targetPool = nodes.get(getNodeKey(targetNode)); + + if (targetPool == null) { + setNodeIfNotExist(targetNode); + targetPool = nodes.get(getNodeKey(targetNode)); + } + + for (Integer slot : targetSlots) { + slots.put(slot, targetPool); + } + } + + public synchronized JedisPool getNode(String nodeKey) { + return nodes.get(nodeKey); + } + + public synchronized JedisPool getSlotPool(int slot) { + return slots.get(slot); + } + + public synchronized Map getNodes() { + return new HashMap(nodes); + } + + public static String getNodeKey(HostAndPort hnp) { + return hnp.getHost() + ":" + hnp.getPort(); + } + + public static String getNodeKey(Client client) { + return client.getHost() + ":" + client.getPort(); + } + + public static String getNodeKey(Jedis jedis) { + return getNodeKey(jedis.getClient()); + } + +} diff --git a/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java index 4cd4fc7..bc9c2d4 100644 --- a/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java @@ -46,14 +46,9 @@ public class JedisSlotBasedConnectionHandler extends throw new JedisConnectionException("no reachable node in cluster"); } - @Override - public void assignSlotToNode(int slot, HostAndPort targetNode) { - super.assignSlotToNode(slot, targetNode); - } - @Override public Jedis getConnectionFromSlot(int slot) { - JedisPool connectionPool = slots.get(slot); + JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { // It can't guaranteed to get valid connection because of node assignment return connectionPool.getResource(); @@ -64,7 +59,7 @@ public class JedisSlotBasedConnectionHandler extends private List getShuffledNodesPool() { List pools = new ArrayList(); - pools.addAll(nodes.values()); + pools.addAll(cache.getNodes().values()); Collections.shuffle(pools); return pools; }