Merging with upstream, necessary changes, shifting poolConfig as private instance variable from JedisClusterConnectionHandler to JedisClusterInfoCache due to design change in previous commits.
This commit is contained in:
@@ -1,131 +1,86 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.util.ClusterNodeInformation;
|
||||
import redis.clients.util.ClusterNodeInformationParser;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey;
|
||||
|
||||
public abstract class JedisClusterConnectionHandler {
|
||||
public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
|
||||
|
||||
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
|
||||
final protected GenericObjectPoolConfig poolConfig;
|
||||
protected final JedisClusterInfoCache cache;
|
||||
|
||||
abstract Jedis getConnection();
|
||||
|
||||
protected void returnConnection(Jedis connection) {
|
||||
nodes.get(getNodeKey(connection.getClient()))
|
||||
.returnResource(connection);
|
||||
public void returnConnection(Jedis connection) {
|
||||
cache.getNode(getNodeKey(connection.getClient())).returnResource(
|
||||
connection);
|
||||
}
|
||||
|
||||
public void returnBrokenConnection(Jedis connection) {
|
||||
nodes.get(getNodeKey(connection.getClient())).returnBrokenResource(
|
||||
cache.getNode(getNodeKey(connection.getClient())).returnBrokenResource(
|
||||
connection);
|
||||
}
|
||||
|
||||
abstract Jedis getConnectionFromSlot(int slot);
|
||||
|
||||
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig) {
|
||||
this.poolConfig = poolConfig;
|
||||
initializeSlotsCache(nodes);
|
||||
this.cache = new JedisClusterInfoCache(poolConfig);
|
||||
initializeSlotsCache(nodes, poolConfig);
|
||||
}
|
||||
|
||||
public Map<String, JedisPool> getNodes() {
|
||||
return nodes;
|
||||
return cache.getNodes();
|
||||
}
|
||||
|
||||
private void initializeSlotsCache(Set<HostAndPort> startNodes) {
|
||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||
cache.assignSlotToNode(slot, targetNode);
|
||||
}
|
||||
|
||||
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) {
|
||||
for (HostAndPort hostAndPort : startNodes) {
|
||||
JedisPool jp = new JedisPool(poolConfig, hostAndPort.getHost(),
|
||||
hostAndPort.getPort());
|
||||
|
||||
this.nodes.clear();
|
||||
this.slots.clear();
|
||||
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
jedis = jp.getResource();
|
||||
discoverClusterNodesAndSlots(jedis);
|
||||
cache.discoverClusterNodesAndSlots(jedis);
|
||||
break;
|
||||
} catch (JedisConnectionException e) {
|
||||
if (jedis != null) {
|
||||
jp.returnBrokenResource(jedis);
|
||||
jedis = null;
|
||||
}
|
||||
|
||||
// try next nodes
|
||||
} finally {
|
||||
if (jedis != null) {
|
||||
jp.returnResource(jedis);
|
||||
jedis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (HostAndPort node : startNodes) {
|
||||
setNodeIfNotExist(node);
|
||||
cache.setNodeIfNotExist(node);
|
||||
}
|
||||
}
|
||||
|
||||
private void discoverClusterNodesAndSlots(Jedis jedis) {
|
||||
String localNodes = jedis.clusterNodes();
|
||||
for (String nodeInfo : localNodes.split("\n")) {
|
||||
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
|
||||
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
|
||||
jedis.getClient().getPort()));
|
||||
|
||||
HostAndPort targetNode = clusterNodeInfo.getNode();
|
||||
setNodeIfNotExist(targetNode);
|
||||
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
|
||||
}
|
||||
}
|
||||
|
||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
slots.put(slot, targetPool);
|
||||
}
|
||||
|
||||
public void assignSlotsToNode(List<Integer> targetSlots,
|
||||
HostAndPort targetNode) {
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
|
||||
for (Integer slot : targetSlots) {
|
||||
slots.put(slot, targetPool);
|
||||
public void renewSlotCache() {
|
||||
for (JedisPool jp : cache.getNodes().values()) {
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
jedis = jp.getResource();
|
||||
cache.discoverClusterSlots(jedis);
|
||||
break;
|
||||
} finally {
|
||||
if (jedis != null) {
|
||||
jedis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected JedisPool getRandomConnection() {
|
||||
Object[] nodeArray = nodes.values().toArray();
|
||||
Object[] nodeArray = cache.getNodes().values().toArray();
|
||||
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
|
||||
}
|
||||
|
||||
protected String getNodeKey(HostAndPort hnp) {
|
||||
return hnp.getHost() + ":" + hnp.getPort();
|
||||
}
|
||||
|
||||
protected String getNodeKey(Client client) {
|
||||
return client.getHost() + ":" + client.getPort();
|
||||
}
|
||||
|
||||
private void setNodeIfNotExist(HostAndPort node) {
|
||||
String nodeKey = getNodeKey(node);
|
||||
if (nodes.containsKey(nodeKey))
|
||||
return;
|
||||
|
||||
JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort());
|
||||
nodes.put(nodeKey, nodePool);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user