Renew slots when MOVED occured during request to Cluster
* It's suggested by http://redis.io/topics/cluster-spec ** antirez/redis-rb-cluster implementation does it, too * Since Redis 3.0-beta 7 introduces CLUSTER SLOTS, it becomes easier * FIXME: It's fully synchronized, so it hurts performance (somewhat poor implementation) ** We can try Reader / Writer strategy to make lock waiting make shorter
This commit is contained in:
@@ -72,13 +72,15 @@ public abstract class JedisClusterCommand<T> {
|
|||||||
} catch (JedisRedirectionException jre) {
|
} catch (JedisRedirectionException jre) {
|
||||||
if (jre instanceof JedisAskDataException) {
|
if (jre instanceof JedisAskDataException) {
|
||||||
asking = true;
|
asking = true;
|
||||||
} else if (jre instanceof JedisMovedDataException) {
|
this.connectionHandler.assignSlotToNode(jre.getSlot(),
|
||||||
// TODO : In antirez's redis-rb-cluster implementation,
|
jre.getTargetNode());
|
||||||
// it rebuilds cluster's slot and node cache
|
} else if (jre instanceof JedisMovedDataException) {
|
||||||
}
|
// it rebuilds cluster's slot cache
|
||||||
|
// recommended by Redis cluster specification
|
||||||
this.connectionHandler.assignSlotToNode(jre.getSlot(),
|
this.connectionHandler.renewSlotCache();
|
||||||
jre.getTargetNode());
|
} else {
|
||||||
|
throw new JedisClusterException(jre);
|
||||||
|
}
|
||||||
|
|
||||||
releaseConnection(connection, false);
|
releaseConnection(connection, false);
|
||||||
connection = null;
|
connection = null;
|
||||||
|
|||||||
@@ -1,27 +1,26 @@
|
|||||||
package redis.clients.jedis;
|
package redis.clients.jedis;
|
||||||
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||||
import redis.clients.util.ClusterNodeInformation;
|
|
||||||
import redis.clients.util.ClusterNodeInformationParser;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey;
|
||||||
|
|
||||||
public abstract class JedisClusterConnectionHandler {
|
public abstract class JedisClusterConnectionHandler {
|
||||||
public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
|
protected JedisClusterInfoCache cache = new JedisClusterInfoCache();
|
||||||
|
|
||||||
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
|
||||||
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
|
|
||||||
|
|
||||||
abstract Jedis getConnection();
|
abstract Jedis getConnection();
|
||||||
|
|
||||||
protected void returnConnection(Jedis connection) {
|
public void returnConnection(Jedis connection) {
|
||||||
nodes.get(getNodeKey(connection.getClient()))
|
cache.getNode(getNodeKey(connection.getClient()))
|
||||||
.returnResource(connection);
|
.returnResource(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void returnBrokenConnection(Jedis connection) {
|
public void returnBrokenConnection(Jedis connection) {
|
||||||
nodes.get(getNodeKey(connection.getClient())).returnBrokenResource(
|
cache.getNode(getNodeKey(connection.getClient()))
|
||||||
connection);
|
.returnBrokenResource(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract Jedis getConnectionFromSlot(int slot);
|
abstract Jedis getConnectionFromSlot(int slot);
|
||||||
@@ -31,7 +30,11 @@ public abstract class JedisClusterConnectionHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, JedisPool> getNodes() {
|
public Map<String, JedisPool> getNodes() {
|
||||||
return nodes;
|
return cache.getNodes();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||||
|
cache.assignSlotToNode(slot, targetNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeSlotsCache(Set<HostAndPort> startNodes) {
|
private void initializeSlotsCache(Set<HostAndPort> startNodes) {
|
||||||
@@ -39,89 +42,43 @@ public abstract class JedisClusterConnectionHandler {
|
|||||||
JedisPool jp = new JedisPool(hostAndPort.getHost(),
|
JedisPool jp = new JedisPool(hostAndPort.getHost(),
|
||||||
hostAndPort.getPort());
|
hostAndPort.getPort());
|
||||||
|
|
||||||
this.nodes.clear();
|
|
||||||
this.slots.clear();
|
|
||||||
|
|
||||||
Jedis jedis = null;
|
Jedis jedis = null;
|
||||||
try {
|
try {
|
||||||
jedis = jp.getResource();
|
jedis = jp.getResource();
|
||||||
discoverClusterNodesAndSlots(jedis);
|
cache.discoverClusterNodesAndSlots(jedis);
|
||||||
break;
|
break;
|
||||||
} catch (JedisConnectionException e) {
|
} catch (JedisConnectionException e) {
|
||||||
if (jedis != null) {
|
|
||||||
jp.returnBrokenResource(jedis);
|
|
||||||
jedis = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// try next nodes
|
// try next nodes
|
||||||
} finally {
|
} finally {
|
||||||
if (jedis != null) {
|
if (jedis != null) {
|
||||||
jp.returnResource(jedis);
|
jedis.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (HostAndPort node : startNodes) {
|
for (HostAndPort node : startNodes) {
|
||||||
setNodeIfNotExist(node);
|
cache.setNodeIfNotExist(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void discoverClusterNodesAndSlots(Jedis jedis) {
|
public void renewSlotCache() {
|
||||||
String localNodes = jedis.clusterNodes();
|
for (JedisPool jp : cache.getNodes().values()) {
|
||||||
for (String nodeInfo : localNodes.split("\n")) {
|
Jedis jedis = null;
|
||||||
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
|
try {
|
||||||
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
|
jedis = jp.getResource();
|
||||||
jedis.getClient().getPort()));
|
cache.discoverClusterSlots(jedis);
|
||||||
|
break;
|
||||||
HostAndPort targetNode = clusterNodeInfo.getNode();
|
} finally {
|
||||||
setNodeIfNotExist(targetNode);
|
if (jedis != null) {
|
||||||
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
|
jedis.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
|
||||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
|
||||||
|
|
||||||
if (targetPool == null) {
|
|
||||||
setNodeIfNotExist(targetNode);
|
|
||||||
targetPool = nodes.get(getNodeKey(targetNode));
|
|
||||||
}
|
|
||||||
slots.put(slot, targetPool);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void assignSlotsToNode(List<Integer> targetSlots,
|
|
||||||
HostAndPort targetNode) {
|
|
||||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
|
||||||
|
|
||||||
if (targetPool == null) {
|
|
||||||
setNodeIfNotExist(targetNode);
|
|
||||||
targetPool = nodes.get(getNodeKey(targetNode));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Integer slot : targetSlots) {
|
|
||||||
slots.put(slot, targetPool);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected JedisPool getRandomConnection() {
|
protected JedisPool getRandomConnection() {
|
||||||
Object[] nodeArray = nodes.values().toArray();
|
Object[] nodeArray = cache.getNodes().values().toArray();
|
||||||
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
|
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getNodeKey(HostAndPort hnp) {
|
|
||||||
return hnp.getHost() + ":" + hnp.getPort();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getNodeKey(Client client) {
|
|
||||||
return client.getHost() + ":" + client.getPort();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setNodeIfNotExist(HostAndPort node) {
|
|
||||||
String nodeKey = getNodeKey(node);
|
|
||||||
if (nodes.containsKey(nodeKey))
|
|
||||||
return;
|
|
||||||
|
|
||||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
|
||||||
nodes.put(nodeKey, nodePool);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
127
src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Normal file
127
src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
package redis.clients.jedis;
|
||||||
|
|
||||||
|
import redis.clients.util.ClusterNodeInformation;
|
||||||
|
import redis.clients.util.ClusterNodeInformationParser;
|
||||||
|
import redis.clients.util.SafeEncoder;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class JedisClusterInfoCache {
|
||||||
|
public static final ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
|
||||||
|
|
||||||
|
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||||
|
private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
|
||||||
|
|
||||||
|
public synchronized void discoverClusterNodesAndSlots(Jedis jedis) {
|
||||||
|
this.nodes.clear();
|
||||||
|
this.slots.clear();
|
||||||
|
|
||||||
|
String localNodes = jedis.clusterNodes();
|
||||||
|
for (String nodeInfo : localNodes.split("\n")) {
|
||||||
|
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
|
||||||
|
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
|
||||||
|
jedis.getClient().getPort()));
|
||||||
|
|
||||||
|
HostAndPort targetNode = clusterNodeInfo.getNode();
|
||||||
|
setNodeIfNotExist(targetNode);
|
||||||
|
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void discoverClusterSlots(Jedis jedis) {
|
||||||
|
this.slots.clear();
|
||||||
|
|
||||||
|
List<Object> slots = jedis.clusterSlots();
|
||||||
|
|
||||||
|
for (Object slotInfoObj : slots) {
|
||||||
|
List<Object> slotInfo = (List<Object>) slotInfoObj;
|
||||||
|
|
||||||
|
if (slotInfo.size() <= 2) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// assigned slots
|
||||||
|
List<Integer> slotNums = new ArrayList<Integer>();
|
||||||
|
for (int slot = ((Long) slotInfo.get(0)).intValue() ;
|
||||||
|
slot <= ((Long) slotInfo.get(1)).intValue() ;
|
||||||
|
slot++) {
|
||||||
|
slotNums.add(slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
// hostInfos
|
||||||
|
List<Object> hostInfos = (List<Object>) slotInfo.get(2);
|
||||||
|
if (hostInfos.size() <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// at this time, we just use master, discard slave information
|
||||||
|
HostAndPort targetNode = new HostAndPort(
|
||||||
|
SafeEncoder.encode((byte[]) hostInfos.get(0)),
|
||||||
|
((Long) hostInfos.get(1)).intValue());
|
||||||
|
|
||||||
|
setNodeIfNotExist(targetNode);
|
||||||
|
assignSlotsToNode(slotNums, targetNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void setNodeIfNotExist(HostAndPort node) {
|
||||||
|
String nodeKey = getNodeKey(node);
|
||||||
|
if (nodes.containsKey(nodeKey))
|
||||||
|
return;
|
||||||
|
|
||||||
|
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||||
|
nodes.put(nodeKey, nodePool);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||||
|
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||||
|
|
||||||
|
if (targetPool == null) {
|
||||||
|
setNodeIfNotExist(targetNode);
|
||||||
|
targetPool = nodes.get(getNodeKey(targetNode));
|
||||||
|
}
|
||||||
|
slots.put(slot, targetPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void assignSlotsToNode(List<Integer> targetSlots,
|
||||||
|
HostAndPort targetNode) {
|
||||||
|
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||||
|
|
||||||
|
if (targetPool == null) {
|
||||||
|
setNodeIfNotExist(targetNode);
|
||||||
|
targetPool = nodes.get(getNodeKey(targetNode));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Integer slot : targetSlots) {
|
||||||
|
slots.put(slot, targetPool);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized JedisPool getNode(String nodeKey) {
|
||||||
|
return nodes.get(nodeKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized JedisPool getSlotPool(int slot) {
|
||||||
|
return slots.get(slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized Map<String, JedisPool> getNodes() {
|
||||||
|
return new HashMap<String, JedisPool>(nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getNodeKey(HostAndPort hnp) {
|
||||||
|
return hnp.getHost() + ":" + hnp.getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getNodeKey(Client client) {
|
||||||
|
return client.getHost() + ":" + client.getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getNodeKey(Jedis jedis) {
|
||||||
|
return getNodeKey(jedis.getClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -46,14 +46,9 @@ public class JedisSlotBasedConnectionHandler extends
|
|||||||
throw new JedisConnectionException("no reachable node in cluster");
|
throw new JedisConnectionException("no reachable node in cluster");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
|
||||||
super.assignSlotToNode(slot, targetNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Jedis getConnectionFromSlot(int slot) {
|
public Jedis getConnectionFromSlot(int slot) {
|
||||||
JedisPool connectionPool = slots.get(slot);
|
JedisPool connectionPool = cache.getSlotPool(slot);
|
||||||
if (connectionPool != null) {
|
if (connectionPool != null) {
|
||||||
// It can't guaranteed to get valid connection because of node assignment
|
// It can't guaranteed to get valid connection because of node assignment
|
||||||
return connectionPool.getResource();
|
return connectionPool.getResource();
|
||||||
@@ -64,7 +59,7 @@ public class JedisSlotBasedConnectionHandler extends
|
|||||||
|
|
||||||
private List<JedisPool> getShuffledNodesPool() {
|
private List<JedisPool> getShuffledNodesPool() {
|
||||||
List<JedisPool> pools = new ArrayList<JedisPool>();
|
List<JedisPool> pools = new ArrayList<JedisPool>();
|
||||||
pools.addAll(nodes.values());
|
pools.addAll(cache.getNodes().values());
|
||||||
Collections.shuffle(pools);
|
Collections.shuffle(pools);
|
||||||
return pools;
|
return pools;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user