Merging upstream

This commit is contained in:
Mayank Dang
2014-05-29 20:44:40 +05:30
53 changed files with 2285 additions and 248 deletions

View File

@@ -2,14 +2,14 @@ package redis.clients.jedis;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.*;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.ClusterNodeInformation;
import redis.clients.util.ClusterNodeInformationParser;
public abstract class JedisClusterConnectionHandler {
public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
@@ -69,69 +69,40 @@ public abstract class JedisClusterConnectionHandler {
setNodeIfNotExist(node);
}
}
private void discoverClusterNodesAndSlots(Jedis jedis) {
String localNodes = jedis.clusterNodes();
for (String nodeInfo : localNodes.split("\n")) {
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis);
setNodeIfNotExist(node);
String localNodes = jedis.clusterNodes();
for (String nodeInfo : localNodes.split("\n")) {
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
jedis.getClient().getPort()));
JedisPool nodePool = nodes.get(getNodeKey(node));
populateNodeSlots(nodeInfo, nodePool);
}
}
private void setNodeIfNotExist(HostAndPort node) {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey))
return;
JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort());
nodes.put(nodeKey, nodePool);
}
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
String[] nodeInfoArray = nodeInfo.split(" ");
if (nodeInfoArray.length > 7) {
for (int i = 8; i < nodeInfoArray.length; i++) {
processSlot(nodeInfoArray[i], nodePool);
}
}
}
private void processSlot(String slot, JedisPool nodePool) {
if (slot.contains("-")) {
String[] slotRange = slot.split("-");
for (int i = Integer.valueOf(slotRange[0]); i <= Integer
.valueOf(slotRange[1]); i++) {
slots.put(i, nodePool);
}
} else {
slots.put(Integer.valueOf(slot), nodePool);
}
}
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo,
Jedis currentConnection) {
String stringHostAndPort = nodeInfo.split(" ", 3)[1];
if (":0".equals(stringHostAndPort)) {
return new HostAndPort(currentConnection.getClient().getHost(),
currentConnection.getClient().getPort());
}
String[] arrayHostAndPort = stringHostAndPort.split(":");
return new HostAndPort(arrayHostAndPort[0],
Integer.valueOf(arrayHostAndPort[1]));
HostAndPort targetNode = clusterNodeInfo.getNode();
setNodeIfNotExist(targetNode);
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
}
}
public void assignSlotToNode(int slot, HostAndPort targetNode) {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool != null) {
slots.put(slot, targetPool);
} else {
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
slots.put(slot, targetPool);
}
public void assignSlotsToNode(List<Integer> targetSlots,
HostAndPort targetNode) {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
}
@@ -148,4 +119,13 @@ public abstract class JedisClusterConnectionHandler {
protected String getNodeKey(Client client) {
return client.getHost() + ":" + client.getPort();
}
private void setNodeIfNotExist(HostAndPort node) {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey))
return;
JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort());
nodes.put(nodeKey, nodePool);
}
}