Merge pull request #637 from HeartSaVioR/fix-cluster-nodes-parse-error
fix "cluster nodes" parse error when slot is in transition (fixes #635)
This commit is contained in:
@@ -1,13 +1,13 @@
|
|||||||
package redis.clients.jedis;
|
package redis.clients.jedis;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.*;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||||
|
import redis.clients.util.ClusterNodeInformation;
|
||||||
|
import redis.clients.util.ClusterNodeInformationParser;
|
||||||
|
|
||||||
public abstract class JedisClusterConnectionHandler {
|
public abstract class JedisClusterConnectionHandler {
|
||||||
|
public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
|
||||||
|
|
||||||
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||||
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
|
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
|
||||||
@@ -69,65 +69,36 @@ public abstract class JedisClusterConnectionHandler {
|
|||||||
private void discoverClusterNodesAndSlots(Jedis jedis) {
|
private void discoverClusterNodesAndSlots(Jedis jedis) {
|
||||||
String localNodes = jedis.clusterNodes();
|
String localNodes = jedis.clusterNodes();
|
||||||
for (String nodeInfo : localNodes.split("\n")) {
|
for (String nodeInfo : localNodes.split("\n")) {
|
||||||
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis);
|
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
|
||||||
setNodeIfNotExist(node);
|
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
|
||||||
|
jedis.getClient().getPort()));
|
||||||
|
|
||||||
JedisPool nodePool = nodes.get(getNodeKey(node));
|
HostAndPort targetNode = clusterNodeInfo.getNode();
|
||||||
populateNodeSlots(nodeInfo, nodePool);
|
setNodeIfNotExist(targetNode);
|
||||||
|
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setNodeIfNotExist(HostAndPort node) {
|
|
||||||
String nodeKey = getNodeKey(node);
|
|
||||||
if (nodes.containsKey(nodeKey))
|
|
||||||
return;
|
|
||||||
|
|
||||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
|
||||||
nodes.put(nodeKey, nodePool);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
|
|
||||||
String[] nodeInfoArray = nodeInfo.split(" ");
|
|
||||||
if (nodeInfoArray.length > 7) {
|
|
||||||
for (int i = 8; i < nodeInfoArray.length; i++) {
|
|
||||||
processSlot(nodeInfoArray[i], nodePool);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processSlot(String slot, JedisPool nodePool) {
|
|
||||||
if (slot.contains("-")) {
|
|
||||||
String[] slotRange = slot.split("-");
|
|
||||||
for (int i = Integer.valueOf(slotRange[0]); i <= Integer
|
|
||||||
.valueOf(slotRange[1]); i++) {
|
|
||||||
slots.put(i, nodePool);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
slots.put(Integer.valueOf(slot), nodePool);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo,
|
|
||||||
Jedis currentConnection) {
|
|
||||||
String stringHostAndPort = nodeInfo.split(" ", 3)[1];
|
|
||||||
if (":0".equals(stringHostAndPort)) {
|
|
||||||
return new HostAndPort(currentConnection.getClient().getHost(),
|
|
||||||
currentConnection.getClient().getPort());
|
|
||||||
}
|
|
||||||
String[] arrayHostAndPort = stringHostAndPort.split(":");
|
|
||||||
return new HostAndPort(arrayHostAndPort[0],
|
|
||||||
Integer.valueOf(arrayHostAndPort[1]));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||||
|
|
||||||
if (targetPool != null) {
|
if (targetPool == null) {
|
||||||
slots.put(slot, targetPool);
|
|
||||||
} else {
|
|
||||||
setNodeIfNotExist(targetNode);
|
setNodeIfNotExist(targetNode);
|
||||||
|
|
||||||
targetPool = nodes.get(getNodeKey(targetNode));
|
targetPool = nodes.get(getNodeKey(targetNode));
|
||||||
|
}
|
||||||
|
slots.put(slot, targetPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void assignSlotsToNode(List<Integer> targetSlots,
|
||||||
|
HostAndPort targetNode) {
|
||||||
|
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||||
|
|
||||||
|
if (targetPool == null) {
|
||||||
|
setNodeIfNotExist(targetNode);
|
||||||
|
targetPool = nodes.get(getNodeKey(targetNode));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Integer slot : targetSlots) {
|
||||||
slots.put(slot, targetPool);
|
slots.put(slot, targetPool);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -144,4 +115,13 @@ public abstract class JedisClusterConnectionHandler {
|
|||||||
protected String getNodeKey(Client client) {
|
protected String getNodeKey(Client client) {
|
||||||
return client.getHost() + ":" + client.getPort();
|
return client.getHost() + ":" + client.getPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setNodeIfNotExist(HostAndPort node) {
|
||||||
|
String nodeKey = getNodeKey(node);
|
||||||
|
if (nodes.containsKey(nodeKey))
|
||||||
|
return;
|
||||||
|
|
||||||
|
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||||
|
nodes.put(nodeKey, nodePool);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
48
src/main/java/redis/clients/util/ClusterNodeInformation.java
Normal file
48
src/main/java/redis/clients/util/ClusterNodeInformation.java
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
package redis.clients.util;
|
||||||
|
|
||||||
|
import redis.clients.jedis.HostAndPort;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ClusterNodeInformation {
|
||||||
|
private HostAndPort node;
|
||||||
|
private List<Integer> availableSlots;
|
||||||
|
private List<Integer> slotsBeingImported;
|
||||||
|
private List<Integer> slotsBeingMigrated;
|
||||||
|
|
||||||
|
public ClusterNodeInformation(HostAndPort node) {
|
||||||
|
this.node = node;
|
||||||
|
this.availableSlots = new ArrayList<Integer>();
|
||||||
|
this.slotsBeingImported = new ArrayList<Integer>();
|
||||||
|
this.slotsBeingMigrated = new ArrayList<Integer>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addAvailableSlot(int slot) {
|
||||||
|
availableSlots.add(slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSlotBeingImported(int slot) {
|
||||||
|
slotsBeingImported.add(slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSlotBeingMigrated(int slot) {
|
||||||
|
slotsBeingMigrated.add(slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HostAndPort getNode() {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Integer> getAvailableSlots() {
|
||||||
|
return availableSlots;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Integer> getSlotsBeingImported() {
|
||||||
|
return slotsBeingImported;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Integer> getSlotsBeingMigrated() {
|
||||||
|
return slotsBeingMigrated;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,81 @@
|
|||||||
|
package redis.clients.util;
|
||||||
|
|
||||||
|
import redis.clients.jedis.HostAndPort;
|
||||||
|
|
||||||
|
public class ClusterNodeInformationParser {
|
||||||
|
private static final String HOST_MYSELF_IDENTIFIER = ":0";
|
||||||
|
private static final String SLOT_IMPORT_IDENTIFIER = "-<-";
|
||||||
|
private static final String SLOT_IN_TRANSITION_IDENTIFIER = "[";
|
||||||
|
public static final int SLOT_INFORMATIONS_START_INDEX = 8;
|
||||||
|
public static final int HOST_AND_PORT_INDEX = 1;
|
||||||
|
|
||||||
|
public ClusterNodeInformation parse(String nodeInfo, HostAndPort current) {
|
||||||
|
String[] nodeInfoPartArray = nodeInfo.split(" ");
|
||||||
|
|
||||||
|
HostAndPort node = getHostAndPortFromNodeLine(nodeInfoPartArray,
|
||||||
|
current);
|
||||||
|
ClusterNodeInformation info = new ClusterNodeInformation(node);
|
||||||
|
|
||||||
|
if (nodeInfoPartArray.length >= SLOT_INFORMATIONS_START_INDEX) {
|
||||||
|
String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
|
||||||
|
fillSlotInformation(slotInfoPartArray, info);
|
||||||
|
}
|
||||||
|
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String[] extractSlotParts(String[] nodeInfoPartArray) {
|
||||||
|
String[] slotInfoPartArray = new String[nodeInfoPartArray.length
|
||||||
|
- SLOT_INFORMATIONS_START_INDEX];
|
||||||
|
for (int i = SLOT_INFORMATIONS_START_INDEX; i < nodeInfoPartArray.length; i++) {
|
||||||
|
slotInfoPartArray[i - SLOT_INFORMATIONS_START_INDEX] = nodeInfoPartArray[i];
|
||||||
|
}
|
||||||
|
return slotInfoPartArray;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HostAndPort getHostAndPortFromNodeLine(String[] nodeInfoPartArray,
|
||||||
|
HostAndPort current) {
|
||||||
|
String stringHostAndPort = nodeInfoPartArray[HOST_AND_PORT_INDEX];
|
||||||
|
if (HOST_MYSELF_IDENTIFIER.equals(stringHostAndPort)) {
|
||||||
|
return current;
|
||||||
|
}
|
||||||
|
|
||||||
|
String[] arrayHostAndPort = stringHostAndPort.split(":");
|
||||||
|
return new HostAndPort(arrayHostAndPort[0],
|
||||||
|
Integer.valueOf(arrayHostAndPort[1]));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fillSlotInformation(String[] slotInfoPartArray,
|
||||||
|
ClusterNodeInformation info) {
|
||||||
|
for (String slotRange : slotInfoPartArray) {
|
||||||
|
fillSlotInformationFromSlotRange(slotRange, info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fillSlotInformationFromSlotRange(String slotRange,
|
||||||
|
ClusterNodeInformation info) {
|
||||||
|
if (slotRange.startsWith(SLOT_IN_TRANSITION_IDENTIFIER)) {
|
||||||
|
// slot is in transition
|
||||||
|
int slot = Integer.parseInt(slotRange.substring(1).split("-")[0]);
|
||||||
|
|
||||||
|
if (slotRange.contains(SLOT_IMPORT_IDENTIFIER)) {
|
||||||
|
// import
|
||||||
|
info.addSlotBeingImported(slot);
|
||||||
|
} else {
|
||||||
|
// migrate (->-)
|
||||||
|
info.addSlotBeingMigrated(slot);
|
||||||
|
}
|
||||||
|
} else if (slotRange.contains("-")) {
|
||||||
|
// slot range
|
||||||
|
String[] slotRangePart = slotRange.split("-");
|
||||||
|
for (int slot = Integer.valueOf(slotRangePart[0]); slot <= Integer
|
||||||
|
.valueOf(slotRangePart[1]); slot++) {
|
||||||
|
info.addAvailableSlot(slot);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// single slot
|
||||||
|
info.addAvailableSlot(Integer.valueOf(slotRange));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,63 @@
|
|||||||
|
package redis.clients.jedis.tests;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import redis.clients.jedis.HostAndPort;
|
||||||
|
import redis.clients.util.ClusterNodeInformation;
|
||||||
|
import redis.clients.util.ClusterNodeInformationParser;
|
||||||
|
|
||||||
|
public class JedisClusterNodeInformationParserTest extends Assert {
|
||||||
|
private ClusterNodeInformationParser parser;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
parser = new ClusterNodeInformationParser();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseNodeMyself() {
|
||||||
|
String nodeInfo = "9b0d2ab38ee31482c95fdb2c7847a0d40e88d518 :0 myself,master - 0 0 1 connected 0-5460";
|
||||||
|
HostAndPort current = new HostAndPort("localhost", 7379);
|
||||||
|
ClusterNodeInformation clusterNodeInfo = parser
|
||||||
|
.parse(nodeInfo, current);
|
||||||
|
assertEquals(clusterNodeInfo.getNode(), current);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseNormalState() {
|
||||||
|
String nodeInfo = "5f4a2236d00008fba7ac0dd24b95762b446767bd 192.168.0.3:7380 master - 0 1400598804016 2 connected 5461-10922";
|
||||||
|
HostAndPort current = new HostAndPort("localhost", 7379);
|
||||||
|
ClusterNodeInformation clusterNodeInfo = parser
|
||||||
|
.parse(nodeInfo, current);
|
||||||
|
assertNotEquals(clusterNodeInfo.getNode(), current);
|
||||||
|
assertEquals(clusterNodeInfo.getNode(), new HostAndPort("192.168.0.3",
|
||||||
|
7380));
|
||||||
|
|
||||||
|
for (int slot = 5461; slot <= 10922; slot++) {
|
||||||
|
assertTrue(clusterNodeInfo.getAvailableSlots().contains(slot));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(clusterNodeInfo.getSlotsBeingImported().isEmpty());
|
||||||
|
assertTrue(clusterNodeInfo.getSlotsBeingMigrated().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseSlotBeingMigrated() {
|
||||||
|
String nodeInfo = "5f4a2236d00008fba7ac0dd24b95762b446767bd :0 myself,master - 0 0 1 connected 0-5459 [5460->-5f4a2236d00008fba7ac0dd24b95762b446767bd] [5461-<-5f4a2236d00008fba7ac0dd24b95762b446767bd]";
|
||||||
|
HostAndPort current = new HostAndPort("localhost", 7379);
|
||||||
|
ClusterNodeInformation clusterNodeInfo = parser
|
||||||
|
.parse(nodeInfo, current);
|
||||||
|
assertEquals(clusterNodeInfo.getNode(), current);
|
||||||
|
|
||||||
|
for (int slot = 0; slot <= 5459; slot++) {
|
||||||
|
assertTrue(clusterNodeInfo.getAvailableSlots().contains(slot));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(1, clusterNodeInfo.getSlotsBeingMigrated().size());
|
||||||
|
assertTrue(clusterNodeInfo.getSlotsBeingMigrated().contains(5460));
|
||||||
|
assertEquals(1, clusterNodeInfo.getSlotsBeingImported().size());
|
||||||
|
assertTrue(clusterNodeInfo.getSlotsBeingImported().contains(5461));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user