diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java index 94f4c75..e6eb01f 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -1,13 +1,13 @@ package redis.clients.jedis; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.Set; +import java.util.*; import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.util.ClusterNodeInformation; +import redis.clients.util.ClusterNodeInformationParser; public abstract class JedisClusterConnectionHandler { + public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser(); protected Map nodes = new HashMap(); protected Map slots = new HashMap(); @@ -65,69 +65,40 @@ public abstract class JedisClusterConnectionHandler { setNodeIfNotExist(node); } } - + private void discoverClusterNodesAndSlots(Jedis jedis) { String localNodes = jedis.clusterNodes(); for (String nodeInfo : localNodes.split("\n")) { - HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis); - setNodeIfNotExist(node); - - JedisPool nodePool = nodes.get(getNodeKey(node)); - populateNodeSlots(nodeInfo, nodePool); - } - } - - private void setNodeIfNotExist(HostAndPort node) { - String nodeKey = getNodeKey(node); - if (nodes.containsKey(nodeKey)) - return; - - JedisPool nodePool = new JedisPool(node.getHost(), node.getPort()); - nodes.put(nodeKey, nodePool); - } + ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse( + nodeInfo, new HostAndPort(jedis.getClient().getHost(), + jedis.getClient().getPort())); - private void populateNodeSlots(String nodeInfo, JedisPool nodePool) { - String[] nodeInfoArray = nodeInfo.split(" "); - if (nodeInfoArray.length > 7) { - for (int i = 8; i < nodeInfoArray.length; i++) { - processSlot(nodeInfoArray[i], nodePool); - } + HostAndPort targetNode = clusterNodeInfo.getNode(); + setNodeIfNotExist(targetNode); + assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode); } } - private void processSlot(String slot, JedisPool nodePool) { - if (slot.contains("-")) { - String[] slotRange = slot.split("-"); - for (int i = Integer.valueOf(slotRange[0]); i <= Integer - .valueOf(slotRange[1]); i++) { - slots.put(i, nodePool); - } - } else { - slots.put(Integer.valueOf(slot), nodePool); - } - } - - private HostAndPort getHostAndPortFromNodeLine(String nodeInfo, - Jedis currentConnection) { - String stringHostAndPort = nodeInfo.split(" ", 3)[1]; - if (":0".equals(stringHostAndPort)) { - return new HostAndPort(currentConnection.getClient().getHost(), - currentConnection.getClient().getPort()); - } - String[] arrayHostAndPort = stringHostAndPort.split(":"); - return new HostAndPort(arrayHostAndPort[0], - Integer.valueOf(arrayHostAndPort[1])); - } - public void assignSlotToNode(int slot, HostAndPort targetNode) { JedisPool targetPool = nodes.get(getNodeKey(targetNode)); - if (targetPool != null) { - slots.put(slot, targetPool); - } else { + if (targetPool == null) { setNodeIfNotExist(targetNode); - targetPool = nodes.get(getNodeKey(targetNode)); + } + slots.put(slot, targetPool); + } + + public void assignSlotsToNode(List targetSlots, + HostAndPort targetNode) { + JedisPool targetPool = nodes.get(getNodeKey(targetNode)); + + if (targetPool == null) { + setNodeIfNotExist(targetNode); + targetPool = nodes.get(getNodeKey(targetNode)); + } + + for (Integer slot : targetSlots) { slots.put(slot, targetPool); } } @@ -144,4 +115,13 @@ public abstract class JedisClusterConnectionHandler { protected String getNodeKey(Client client) { return client.getHost() + ":" + client.getPort(); } + + private void setNodeIfNotExist(HostAndPort node) { + String nodeKey = getNodeKey(node); + if (nodes.containsKey(nodeKey)) + return; + + JedisPool nodePool = new JedisPool(node.getHost(), node.getPort()); + nodes.put(nodeKey, nodePool); + } } diff --git a/src/main/java/redis/clients/util/ClusterNodeInformation.java b/src/main/java/redis/clients/util/ClusterNodeInformation.java new file mode 100644 index 0000000..a3833d0 --- /dev/null +++ b/src/main/java/redis/clients/util/ClusterNodeInformation.java @@ -0,0 +1,48 @@ +package redis.clients.util; + +import redis.clients.jedis.HostAndPort; + +import java.util.ArrayList; +import java.util.List; + +public class ClusterNodeInformation { + private HostAndPort node; + private List availableSlots; + private List slotsBeingImported; + private List slotsBeingMigrated; + + public ClusterNodeInformation(HostAndPort node) { + this.node = node; + this.availableSlots = new ArrayList(); + this.slotsBeingImported = new ArrayList(); + this.slotsBeingMigrated = new ArrayList(); + } + + public void addAvailableSlot(int slot) { + availableSlots.add(slot); + } + + public void addSlotBeingImported(int slot) { + slotsBeingImported.add(slot); + } + + public void addSlotBeingMigrated(int slot) { + slotsBeingMigrated.add(slot); + } + + public HostAndPort getNode() { + return node; + } + + public List getAvailableSlots() { + return availableSlots; + } + + public List getSlotsBeingImported() { + return slotsBeingImported; + } + + public List getSlotsBeingMigrated() { + return slotsBeingMigrated; + } +} diff --git a/src/main/java/redis/clients/util/ClusterNodeInformationParser.java b/src/main/java/redis/clients/util/ClusterNodeInformationParser.java new file mode 100644 index 0000000..995df6f --- /dev/null +++ b/src/main/java/redis/clients/util/ClusterNodeInformationParser.java @@ -0,0 +1,81 @@ +package redis.clients.util; + +import redis.clients.jedis.HostAndPort; + +public class ClusterNodeInformationParser { + private static final String HOST_MYSELF_IDENTIFIER = ":0"; + private static final String SLOT_IMPORT_IDENTIFIER = "-<-"; + private static final String SLOT_IN_TRANSITION_IDENTIFIER = "["; + public static final int SLOT_INFORMATIONS_START_INDEX = 8; + public static final int HOST_AND_PORT_INDEX = 1; + + public ClusterNodeInformation parse(String nodeInfo, HostAndPort current) { + String[] nodeInfoPartArray = nodeInfo.split(" "); + + HostAndPort node = getHostAndPortFromNodeLine(nodeInfoPartArray, + current); + ClusterNodeInformation info = new ClusterNodeInformation(node); + + if (nodeInfoPartArray.length >= SLOT_INFORMATIONS_START_INDEX) { + String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray); + fillSlotInformation(slotInfoPartArray, info); + } + + return info; + } + + private String[] extractSlotParts(String[] nodeInfoPartArray) { + String[] slotInfoPartArray = new String[nodeInfoPartArray.length + - SLOT_INFORMATIONS_START_INDEX]; + for (int i = SLOT_INFORMATIONS_START_INDEX; i < nodeInfoPartArray.length; i++) { + slotInfoPartArray[i - SLOT_INFORMATIONS_START_INDEX] = nodeInfoPartArray[i]; + } + return slotInfoPartArray; + } + + public HostAndPort getHostAndPortFromNodeLine(String[] nodeInfoPartArray, + HostAndPort current) { + String stringHostAndPort = nodeInfoPartArray[HOST_AND_PORT_INDEX]; + if (HOST_MYSELF_IDENTIFIER.equals(stringHostAndPort)) { + return current; + } + + String[] arrayHostAndPort = stringHostAndPort.split(":"); + return new HostAndPort(arrayHostAndPort[0], + Integer.valueOf(arrayHostAndPort[1])); + } + + private void fillSlotInformation(String[] slotInfoPartArray, + ClusterNodeInformation info) { + for (String slotRange : slotInfoPartArray) { + fillSlotInformationFromSlotRange(slotRange, info); + } + } + + private void fillSlotInformationFromSlotRange(String slotRange, + ClusterNodeInformation info) { + if (slotRange.startsWith(SLOT_IN_TRANSITION_IDENTIFIER)) { + // slot is in transition + int slot = Integer.parseInt(slotRange.substring(1).split("-")[0]); + + if (slotRange.contains(SLOT_IMPORT_IDENTIFIER)) { + // import + info.addSlotBeingImported(slot); + } else { + // migrate (->-) + info.addSlotBeingMigrated(slot); + } + } else if (slotRange.contains("-")) { + // slot range + String[] slotRangePart = slotRange.split("-"); + for (int slot = Integer.valueOf(slotRangePart[0]); slot <= Integer + .valueOf(slotRangePart[1]); slot++) { + info.addAvailableSlot(slot); + } + } else { + // single slot + info.addAvailableSlot(Integer.valueOf(slotRange)); + } + } + +} diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterNodeInformationParserTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterNodeInformationParserTest.java new file mode 100644 index 0000000..bc0fd42 --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterNodeInformationParserTest.java @@ -0,0 +1,63 @@ +package redis.clients.jedis.tests; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.util.ClusterNodeInformation; +import redis.clients.util.ClusterNodeInformationParser; + +public class JedisClusterNodeInformationParserTest extends Assert { + private ClusterNodeInformationParser parser; + + @Before + public void setUp() { + parser = new ClusterNodeInformationParser(); + } + + @Test + public void testParseNodeMyself() { + String nodeInfo = "9b0d2ab38ee31482c95fdb2c7847a0d40e88d518 :0 myself,master - 0 0 1 connected 0-5460"; + HostAndPort current = new HostAndPort("localhost", 7379); + ClusterNodeInformation clusterNodeInfo = parser + .parse(nodeInfo, current); + assertEquals(clusterNodeInfo.getNode(), current); + } + + @Test + public void testParseNormalState() { + String nodeInfo = "5f4a2236d00008fba7ac0dd24b95762b446767bd 192.168.0.3:7380 master - 0 1400598804016 2 connected 5461-10922"; + HostAndPort current = new HostAndPort("localhost", 7379); + ClusterNodeInformation clusterNodeInfo = parser + .parse(nodeInfo, current); + assertNotEquals(clusterNodeInfo.getNode(), current); + assertEquals(clusterNodeInfo.getNode(), new HostAndPort("192.168.0.3", + 7380)); + + for (int slot = 5461; slot <= 10922; slot++) { + assertTrue(clusterNodeInfo.getAvailableSlots().contains(slot)); + } + + assertTrue(clusterNodeInfo.getSlotsBeingImported().isEmpty()); + assertTrue(clusterNodeInfo.getSlotsBeingMigrated().isEmpty()); + } + + @Test + public void testParseSlotBeingMigrated() { + String nodeInfo = "5f4a2236d00008fba7ac0dd24b95762b446767bd :0 myself,master - 0 0 1 connected 0-5459 [5460->-5f4a2236d00008fba7ac0dd24b95762b446767bd] [5461-<-5f4a2236d00008fba7ac0dd24b95762b446767bd]"; + HostAndPort current = new HostAndPort("localhost", 7379); + ClusterNodeInformation clusterNodeInfo = parser + .parse(nodeInfo, current); + assertEquals(clusterNodeInfo.getNode(), current); + + for (int slot = 0; slot <= 5459; slot++) { + assertTrue(clusterNodeInfo.getAvailableSlots().contains(slot)); + } + + assertEquals(1, clusterNodeInfo.getSlotsBeingMigrated().size()); + assertTrue(clusterNodeInfo.getSlotsBeingMigrated().contains(5460)); + assertEquals(1, clusterNodeInfo.getSlotsBeingImported().size()); + assertTrue(clusterNodeInfo.getSlotsBeingImported().contains(5461)); + } + +}