fix "cluster nodes" parse error when slot is in transition
* extract cluster nodes info. parser from JedisClusterConnectionHandler * unit test for migrating slot included
This commit is contained in:
@@ -1,13 +1,13 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.util.ClusterNodeInformation;
|
||||
import redis.clients.util.ClusterNodeInformationParser;
|
||||
|
||||
public abstract class JedisClusterConnectionHandler {
|
||||
public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
|
||||
|
||||
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
|
||||
@@ -65,69 +65,40 @@ public abstract class JedisClusterConnectionHandler {
|
||||
setNodeIfNotExist(node);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void discoverClusterNodesAndSlots(Jedis jedis) {
|
||||
String localNodes = jedis.clusterNodes();
|
||||
for (String nodeInfo : localNodes.split("\n")) {
|
||||
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis);
|
||||
setNodeIfNotExist(node);
|
||||
|
||||
JedisPool nodePool = nodes.get(getNodeKey(node));
|
||||
populateNodeSlots(nodeInfo, nodePool);
|
||||
}
|
||||
}
|
||||
|
||||
private void setNodeIfNotExist(HostAndPort node) {
|
||||
String nodeKey = getNodeKey(node);
|
||||
if (nodes.containsKey(nodeKey))
|
||||
return;
|
||||
|
||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||
nodes.put(nodeKey, nodePool);
|
||||
}
|
||||
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
|
||||
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
|
||||
jedis.getClient().getPort()));
|
||||
|
||||
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
|
||||
String[] nodeInfoArray = nodeInfo.split(" ");
|
||||
if (nodeInfoArray.length > 7) {
|
||||
for (int i = 8; i < nodeInfoArray.length; i++) {
|
||||
processSlot(nodeInfoArray[i], nodePool);
|
||||
}
|
||||
HostAndPort targetNode = clusterNodeInfo.getNode();
|
||||
setNodeIfNotExist(targetNode);
|
||||
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
|
||||
}
|
||||
}
|
||||
|
||||
private void processSlot(String slot, JedisPool nodePool) {
|
||||
if (slot.contains("-")) {
|
||||
String[] slotRange = slot.split("-");
|
||||
for (int i = Integer.valueOf(slotRange[0]); i <= Integer
|
||||
.valueOf(slotRange[1]); i++) {
|
||||
slots.put(i, nodePool);
|
||||
}
|
||||
} else {
|
||||
slots.put(Integer.valueOf(slot), nodePool);
|
||||
}
|
||||
}
|
||||
|
||||
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo,
|
||||
Jedis currentConnection) {
|
||||
String stringHostAndPort = nodeInfo.split(" ", 3)[1];
|
||||
if (":0".equals(stringHostAndPort)) {
|
||||
return new HostAndPort(currentConnection.getClient().getHost(),
|
||||
currentConnection.getClient().getPort());
|
||||
}
|
||||
String[] arrayHostAndPort = stringHostAndPort.split(":");
|
||||
return new HostAndPort(arrayHostAndPort[0],
|
||||
Integer.valueOf(arrayHostAndPort[1]));
|
||||
}
|
||||
|
||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
|
||||
if (targetPool != null) {
|
||||
slots.put(slot, targetPool);
|
||||
} else {
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
slots.put(slot, targetPool);
|
||||
}
|
||||
|
||||
public void assignSlotsToNode(List<Integer> targetSlots,
|
||||
HostAndPort targetNode) {
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
|
||||
for (Integer slot : targetSlots) {
|
||||
slots.put(slot, targetPool);
|
||||
}
|
||||
}
|
||||
@@ -144,4 +115,13 @@ public abstract class JedisClusterConnectionHandler {
|
||||
protected String getNodeKey(Client client) {
|
||||
return client.getHost() + ":" + client.getPort();
|
||||
}
|
||||
|
||||
private void setNodeIfNotExist(HostAndPort node) {
|
||||
String nodeKey = getNodeKey(node);
|
||||
if (nodes.containsKey(nodeKey))
|
||||
return;
|
||||
|
||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||
nodes.put(nodeKey, nodePool);
|
||||
}
|
||||
}
|
||||
|
||||
48
src/main/java/redis/clients/util/ClusterNodeInformation.java
Normal file
48
src/main/java/redis/clients/util/ClusterNodeInformation.java
Normal file
@@ -0,0 +1,48 @@
|
||||
package redis.clients.util;
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class ClusterNodeInformation {
|
||||
private HostAndPort node;
|
||||
private List<Integer> availableSlots;
|
||||
private List<Integer> slotsBeingImported;
|
||||
private List<Integer> slotsBeingMigrated;
|
||||
|
||||
public ClusterNodeInformation(HostAndPort node) {
|
||||
this.node = node;
|
||||
this.availableSlots = new ArrayList<Integer>();
|
||||
this.slotsBeingImported = new ArrayList<Integer>();
|
||||
this.slotsBeingMigrated = new ArrayList<Integer>();
|
||||
}
|
||||
|
||||
public void addAvailableSlot(int slot) {
|
||||
availableSlots.add(slot);
|
||||
}
|
||||
|
||||
public void addSlotBeingImported(int slot) {
|
||||
slotsBeingImported.add(slot);
|
||||
}
|
||||
|
||||
public void addSlotBeingMigrated(int slot) {
|
||||
slotsBeingMigrated.add(slot);
|
||||
}
|
||||
|
||||
public HostAndPort getNode() {
|
||||
return node;
|
||||
}
|
||||
|
||||
public List<Integer> getAvailableSlots() {
|
||||
return availableSlots;
|
||||
}
|
||||
|
||||
public List<Integer> getSlotsBeingImported() {
|
||||
return slotsBeingImported;
|
||||
}
|
||||
|
||||
public List<Integer> getSlotsBeingMigrated() {
|
||||
return slotsBeingMigrated;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package redis.clients.util;
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
|
||||
public class ClusterNodeInformationParser {
|
||||
private static final String HOST_MYSELF_IDENTIFIER = ":0";
|
||||
private static final String SLOT_IMPORT_IDENTIFIER = "-<-";
|
||||
private static final String SLOT_IN_TRANSITION_IDENTIFIER = "[";
|
||||
public static final int SLOT_INFORMATIONS_START_INDEX = 8;
|
||||
public static final int HOST_AND_PORT_INDEX = 1;
|
||||
|
||||
public ClusterNodeInformation parse(String nodeInfo, HostAndPort current) {
|
||||
String[] nodeInfoPartArray = nodeInfo.split(" ");
|
||||
|
||||
HostAndPort node = getHostAndPortFromNodeLine(nodeInfoPartArray,
|
||||
current);
|
||||
ClusterNodeInformation info = new ClusterNodeInformation(node);
|
||||
|
||||
if (nodeInfoPartArray.length >= SLOT_INFORMATIONS_START_INDEX) {
|
||||
String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
|
||||
fillSlotInformation(slotInfoPartArray, info);
|
||||
}
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
private String[] extractSlotParts(String[] nodeInfoPartArray) {
|
||||
String[] slotInfoPartArray = new String[nodeInfoPartArray.length
|
||||
- SLOT_INFORMATIONS_START_INDEX];
|
||||
for (int i = SLOT_INFORMATIONS_START_INDEX; i < nodeInfoPartArray.length; i++) {
|
||||
slotInfoPartArray[i - SLOT_INFORMATIONS_START_INDEX] = nodeInfoPartArray[i];
|
||||
}
|
||||
return slotInfoPartArray;
|
||||
}
|
||||
|
||||
public HostAndPort getHostAndPortFromNodeLine(String[] nodeInfoPartArray,
|
||||
HostAndPort current) {
|
||||
String stringHostAndPort = nodeInfoPartArray[HOST_AND_PORT_INDEX];
|
||||
if (HOST_MYSELF_IDENTIFIER.equals(stringHostAndPort)) {
|
||||
return current;
|
||||
}
|
||||
|
||||
String[] arrayHostAndPort = stringHostAndPort.split(":");
|
||||
return new HostAndPort(arrayHostAndPort[0],
|
||||
Integer.valueOf(arrayHostAndPort[1]));
|
||||
}
|
||||
|
||||
private void fillSlotInformation(String[] slotInfoPartArray,
|
||||
ClusterNodeInformation info) {
|
||||
for (String slotRange : slotInfoPartArray) {
|
||||
fillSlotInformationFromSlotRange(slotRange, info);
|
||||
}
|
||||
}
|
||||
|
||||
private void fillSlotInformationFromSlotRange(String slotRange,
|
||||
ClusterNodeInformation info) {
|
||||
if (slotRange.startsWith(SLOT_IN_TRANSITION_IDENTIFIER)) {
|
||||
// slot is in transition
|
||||
int slot = Integer.parseInt(slotRange.substring(1).split("-")[0]);
|
||||
|
||||
if (slotRange.contains(SLOT_IMPORT_IDENTIFIER)) {
|
||||
// import
|
||||
info.addSlotBeingImported(slot);
|
||||
} else {
|
||||
// migrate (->-)
|
||||
info.addSlotBeingMigrated(slot);
|
||||
}
|
||||
} else if (slotRange.contains("-")) {
|
||||
// slot range
|
||||
String[] slotRangePart = slotRange.split("-");
|
||||
for (int slot = Integer.valueOf(slotRangePart[0]); slot <= Integer
|
||||
.valueOf(slotRangePart[1]); slot++) {
|
||||
info.addAvailableSlot(slot);
|
||||
}
|
||||
} else {
|
||||
// single slot
|
||||
info.addAvailableSlot(Integer.valueOf(slotRange));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package redis.clients.jedis.tests;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.util.ClusterNodeInformation;
|
||||
import redis.clients.util.ClusterNodeInformationParser;
|
||||
|
||||
public class JedisClusterNodeInformationParserTest extends Assert {
|
||||
private ClusterNodeInformationParser parser;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
parser = new ClusterNodeInformationParser();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseNodeMyself() {
|
||||
String nodeInfo = "9b0d2ab38ee31482c95fdb2c7847a0d40e88d518 :0 myself,master - 0 0 1 connected 0-5460";
|
||||
HostAndPort current = new HostAndPort("localhost", 7379);
|
||||
ClusterNodeInformation clusterNodeInfo = parser
|
||||
.parse(nodeInfo, current);
|
||||
assertEquals(clusterNodeInfo.getNode(), current);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseNormalState() {
|
||||
String nodeInfo = "5f4a2236d00008fba7ac0dd24b95762b446767bd 192.168.0.3:7380 master - 0 1400598804016 2 connected 5461-10922";
|
||||
HostAndPort current = new HostAndPort("localhost", 7379);
|
||||
ClusterNodeInformation clusterNodeInfo = parser
|
||||
.parse(nodeInfo, current);
|
||||
assertNotEquals(clusterNodeInfo.getNode(), current);
|
||||
assertEquals(clusterNodeInfo.getNode(), new HostAndPort("192.168.0.3",
|
||||
7380));
|
||||
|
||||
for (int slot = 5461; slot <= 10922; slot++) {
|
||||
assertTrue(clusterNodeInfo.getAvailableSlots().contains(slot));
|
||||
}
|
||||
|
||||
assertTrue(clusterNodeInfo.getSlotsBeingImported().isEmpty());
|
||||
assertTrue(clusterNodeInfo.getSlotsBeingMigrated().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseSlotBeingMigrated() {
|
||||
String nodeInfo = "5f4a2236d00008fba7ac0dd24b95762b446767bd :0 myself,master - 0 0 1 connected 0-5459 [5460->-5f4a2236d00008fba7ac0dd24b95762b446767bd] [5461-<-5f4a2236d00008fba7ac0dd24b95762b446767bd]";
|
||||
HostAndPort current = new HostAndPort("localhost", 7379);
|
||||
ClusterNodeInformation clusterNodeInfo = parser
|
||||
.parse(nodeInfo, current);
|
||||
assertEquals(clusterNodeInfo.getNode(), current);
|
||||
|
||||
for (int slot = 0; slot <= 5459; slot++) {
|
||||
assertTrue(clusterNodeInfo.getAvailableSlots().contains(slot));
|
||||
}
|
||||
|
||||
assertEquals(1, clusterNodeInfo.getSlotsBeingMigrated().size());
|
||||
assertTrue(clusterNodeInfo.getSlotsBeingMigrated().contains(5460));
|
||||
assertEquals(1, clusterNodeInfo.getSlotsBeingImported().size());
|
||||
assertTrue(clusterNodeInfo.getSlotsBeingImported().contains(5461));
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user