Merge branch 'renew-slots-info-when-moved-has-occurred' of https://github.com/HeartSaVioR/jedis into HeartSaVioR-renew-slots-info-when-moved-has-occurred

This commit is contained in:
Jungtaek Lim
2014-09-11 10:09:16 +09:00
4 changed files with 227 additions and 91 deletions

View File

@@ -63,33 +63,35 @@ public abstract class JedisClusterCommand<T> {
// maybe all connection is down // maybe all connection is down
throw jce; throw jce;
} }
releaseConnection(connection, true); releaseConnection(connection, true);
connection = null; connection = null;
// retry with random connection // retry with random connection
return runWithRetries(key, redirections--, true, asking); return runWithRetries(key, redirections--, true, asking);
} catch (JedisRedirectionException jre) { } catch (JedisRedirectionException jre) {
if (jre instanceof JedisAskDataException) { if (jre instanceof JedisAskDataException) {
asking = true; asking = true;
this.connectionHandler.assignSlotToNode(jre.getSlot(),
jre.getTargetNode());
} else if (jre instanceof JedisMovedDataException) { } else if (jre instanceof JedisMovedDataException) {
// TODO : In antirez's redis-rb-cluster implementation, // it rebuilds cluster's slot cache
// it rebuilds cluster's slot and node cache // recommended by Redis cluster specification
this.connectionHandler.renewSlotCache();
} else {
throw new JedisClusterException(jre);
} }
this.connectionHandler.assignSlotToNode(jre.getSlot(),
jre.getTargetNode());
releaseConnection(connection, false); releaseConnection(connection, false);
connection = null; connection = null;
return runWithRetries(key, redirections - 1, false, asking); return runWithRetries(key, redirections - 1, false, asking);
} finally { } finally {
releaseConnection(connection, false); releaseConnection(connection, false);
} }
} }
private void releaseConnection(Jedis connection, boolean broken) { private void releaseConnection(Jedis connection, boolean broken) {
if (connection != null) { if (connection != null) {
if (broken) { if (broken) {

View File

@@ -1,26 +1,25 @@
package redis.clients.jedis; package redis.clients.jedis;
import java.util.*;
import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.ClusterNodeInformation;
import redis.clients.util.ClusterNodeInformationParser; import java.util.Map;
import java.util.Random;
import java.util.Set;
import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey;
public abstract class JedisClusterConnectionHandler { public abstract class JedisClusterConnectionHandler {
public static ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser(); protected JedisClusterInfoCache cache = new JedisClusterInfoCache();
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
abstract Jedis getConnection(); abstract Jedis getConnection();
protected void returnConnection(Jedis connection) { public void returnConnection(Jedis connection) {
nodes.get(getNodeKey(connection.getClient())) cache.getNode(getNodeKey(connection.getClient())).returnResource(
.returnResource(connection); connection);
} }
public void returnBrokenConnection(Jedis connection) { public void returnBrokenConnection(Jedis connection) {
nodes.get(getNodeKey(connection.getClient())).returnBrokenResource( cache.getNode(getNodeKey(connection.getClient())).returnBrokenResource(
connection); connection);
} }
@@ -31,7 +30,11 @@ public abstract class JedisClusterConnectionHandler {
} }
public Map<String, JedisPool> getNodes() { public Map<String, JedisPool> getNodes() {
return nodes; return cache.getNodes();
}
public void assignSlotToNode(int slot, HostAndPort targetNode) {
cache.assignSlotToNode(slot, targetNode);
} }
private void initializeSlotsCache(Set<HostAndPort> startNodes) { private void initializeSlotsCache(Set<HostAndPort> startNodes) {
@@ -39,89 +42,43 @@ public abstract class JedisClusterConnectionHandler {
JedisPool jp = new JedisPool(hostAndPort.getHost(), JedisPool jp = new JedisPool(hostAndPort.getHost(),
hostAndPort.getPort()); hostAndPort.getPort());
this.nodes.clear();
this.slots.clear();
Jedis jedis = null; Jedis jedis = null;
try { try {
jedis = jp.getResource(); jedis = jp.getResource();
discoverClusterNodesAndSlots(jedis); cache.discoverClusterNodesAndSlots(jedis);
break; break;
} catch (JedisConnectionException e) { } catch (JedisConnectionException e) {
if (jedis != null) {
jp.returnBrokenResource(jedis);
jedis = null;
}
// try next nodes // try next nodes
} finally { } finally {
if (jedis != null) { if (jedis != null) {
jp.returnResource(jedis); jedis.close();
} }
} }
} }
for (HostAndPort node : startNodes) { for (HostAndPort node : startNodes) {
setNodeIfNotExist(node); cache.setNodeIfNotExist(node);
} }
} }
private void discoverClusterNodesAndSlots(Jedis jedis) { public void renewSlotCache() {
String localNodes = jedis.clusterNodes(); for (JedisPool jp : cache.getNodes().values()) {
for (String nodeInfo : localNodes.split("\n")) { Jedis jedis = null;
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse( try {
nodeInfo, new HostAndPort(jedis.getClient().getHost(), jedis = jp.getResource();
jedis.getClient().getPort())); cache.discoverClusterSlots(jedis);
break;
HostAndPort targetNode = clusterNodeInfo.getNode(); } finally {
setNodeIfNotExist(targetNode); if (jedis != null) {
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode); jedis.close();
} }
} }
public void assignSlotToNode(int slot, HostAndPort targetNode) {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
slots.put(slot, targetPool);
}
public void assignSlotsToNode(List<Integer> targetSlots,
HostAndPort targetNode) {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
} }
} }
protected JedisPool getRandomConnection() { protected JedisPool getRandomConnection() {
Object[] nodeArray = nodes.values().toArray(); Object[] nodeArray = cache.getNodes().values().toArray();
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]); return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
} }
protected String getNodeKey(HostAndPort hnp) {
return hnp.getHost() + ":" + hnp.getPort();
}
protected String getNodeKey(Client client) {
return client.getHost() + ":" + client.getPort();
}
private void setNodeIfNotExist(HostAndPort node) {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey))
return;
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
nodes.put(nodeKey, nodePool);
}
} }

View File

@@ -0,0 +1,182 @@
package redis.clients.jedis;
import redis.clients.util.ClusterNodeInformation;
import redis.clients.util.ClusterNodeInformationParser;
import redis.clients.util.SafeEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class JedisClusterInfoCache {
public static final ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public void discoverClusterNodesAndSlots(Jedis jedis) {
w.lock();
try {
this.nodes.clear();
this.slots.clear();
String localNodes = jedis.clusterNodes();
for (String nodeInfo : localNodes.split("\n")) {
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
jedis.getClient().getPort()));
HostAndPort targetNode = clusterNodeInfo.getNode();
setNodeIfNotExist(targetNode);
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(),
targetNode);
}
} finally {
w.unlock();
}
}
public void discoverClusterSlots(Jedis jedis) {
w.lock();
try {
this.slots.clear();
List<Object> slots = jedis.clusterSlots();
for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
if (slotInfo.size() <= 2) {
continue;
}
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
// hostInfos
List<Object> hostInfos = (List<Object>) slotInfo.get(2);
if (hostInfos.size() <= 0) {
continue;
}
// at this time, we just use master, discard slave information
HostAndPort targetNode = generateHostAndPort(hostInfos);
setNodeIfNotExist(targetNode);
assignSlotsToNode(slotNums, targetNode);
}
} finally {
w.unlock();
}
}
private HostAndPort generateHostAndPort(List<Object> hostInfos) {
return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
((Long) hostInfos.get(1)).intValue());
}
public void setNodeIfNotExist(HostAndPort node) {
w.lock();
try {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey))
return;
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
nodes.put(nodeKey, nodePool);
} finally {
w.unlock();
}
}
public void assignSlotToNode(int slot, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
slots.put(slot, targetPool);
} finally {
w.unlock();
}
}
public synchronized void assignSlotsToNode(List<Integer> targetSlots,
HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
} finally {
w.unlock();
}
}
public synchronized JedisPool getNode(String nodeKey) {
r.lock();
try {
return nodes.get(nodeKey);
} finally {
r.unlock();
}
}
public synchronized JedisPool getSlotPool(int slot) {
r.lock();
try {
return slots.get(slot);
} finally {
r.unlock();
}
}
public synchronized Map<String, JedisPool> getNodes() {
r.lock();
try {
return new HashMap<String, JedisPool>(nodes);
} finally {
r.unlock();
}
}
public static String getNodeKey(HostAndPort hnp) {
return hnp.getHost() + ":" + hnp.getPort();
}
public static String getNodeKey(Client client) {
return client.getHost() + ":" + client.getPort();
}
public static String getNodeKey(Jedis jedis) {
return getNodeKey(jedis.getClient());
}
private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
List<Integer> slotNums = new ArrayList<Integer>();
for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo
.get(1)).intValue(); slot++) {
slotNums.add(slot);
}
return slotNums;
}
}

View File

@@ -46,14 +46,9 @@ public class JedisSlotBasedConnectionHandler extends
throw new JedisConnectionException("no reachable node in cluster"); throw new JedisConnectionException("no reachable node in cluster");
} }
@Override
public void assignSlotToNode(int slot, HostAndPort targetNode) {
super.assignSlotToNode(slot, targetNode);
}
@Override @Override
public Jedis getConnectionFromSlot(int slot) { public Jedis getConnectionFromSlot(int slot) {
JedisPool connectionPool = slots.get(slot); JedisPool connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) { if (connectionPool != null) {
// It can't guaranteed to get valid connection because of node assignment // It can't guaranteed to get valid connection because of node assignment
return connectionPool.getResource(); return connectionPool.getResource();
@@ -64,7 +59,7 @@ public class JedisSlotBasedConnectionHandler extends
private List<JedisPool> getShuffledNodesPool() { private List<JedisPool> getShuffledNodesPool() {
List<JedisPool> pools = new ArrayList<JedisPool>(); List<JedisPool> pools = new ArrayList<JedisPool>();
pools.addAll(nodes.values()); pools.addAll(cache.getNodes().values());
Collections.shuffle(pools); Collections.shuffle(pools);
return pools; return pools;
} }