Re-format source to respect Jedis convention
This commit is contained in:
@@ -63,35 +63,35 @@ public abstract class JedisClusterCommand<T> {
|
||||
// maybe all connection is down
|
||||
throw jce;
|
||||
}
|
||||
|
||||
|
||||
releaseConnection(connection, true);
|
||||
connection = null;
|
||||
|
||||
|
||||
// retry with random connection
|
||||
return runWithRetries(key, redirections--, true, asking);
|
||||
} catch (JedisRedirectionException jre) {
|
||||
if (jre instanceof JedisAskDataException) {
|
||||
asking = true;
|
||||
this.connectionHandler.assignSlotToNode(jre.getSlot(),
|
||||
jre.getTargetNode());
|
||||
} else if (jre instanceof JedisMovedDataException) {
|
||||
// it rebuilds cluster's slot cache
|
||||
// recommended by Redis cluster specification
|
||||
this.connectionHandler.renewSlotCache();
|
||||
} else {
|
||||
throw new JedisClusterException(jre);
|
||||
}
|
||||
this.connectionHandler.assignSlotToNode(jre.getSlot(),
|
||||
jre.getTargetNode());
|
||||
} else if (jre instanceof JedisMovedDataException) {
|
||||
// it rebuilds cluster's slot cache
|
||||
// recommended by Redis cluster specification
|
||||
this.connectionHandler.renewSlotCache();
|
||||
} else {
|
||||
throw new JedisClusterException(jre);
|
||||
}
|
||||
|
||||
releaseConnection(connection, false);
|
||||
connection = null;
|
||||
|
||||
|
||||
return runWithRetries(key, redirections - 1, false, asking);
|
||||
} finally {
|
||||
releaseConnection(connection, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void releaseConnection(Jedis connection, boolean broken) {
|
||||
if (connection != null) {
|
||||
if (broken) {
|
||||
|
||||
@@ -14,13 +14,13 @@ public abstract class JedisClusterConnectionHandler {
|
||||
abstract Jedis getConnection();
|
||||
|
||||
public void returnConnection(Jedis connection) {
|
||||
cache.getNode(getNodeKey(connection.getClient()))
|
||||
.returnResource(connection);
|
||||
cache.getNode(getNodeKey(connection.getClient())).returnResource(
|
||||
connection);
|
||||
}
|
||||
|
||||
public void returnBrokenConnection(Jedis connection) {
|
||||
cache.getNode(getNodeKey(connection.getClient()))
|
||||
.returnBrokenResource(connection);
|
||||
cache.getNode(getNodeKey(connection.getClient())).returnBrokenResource(
|
||||
connection);
|
||||
}
|
||||
|
||||
abstract Jedis getConnectionFromSlot(int slot);
|
||||
@@ -34,7 +34,7 @@ public abstract class JedisClusterConnectionHandler {
|
||||
}
|
||||
|
||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||
cache.assignSlotToNode(slot, targetNode);
|
||||
cache.assignSlotToNode(slot, targetNode);
|
||||
}
|
||||
|
||||
private void initializeSlotsCache(Set<HostAndPort> startNodes) {
|
||||
@@ -45,14 +45,14 @@ public abstract class JedisClusterConnectionHandler {
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
jedis = jp.getResource();
|
||||
cache.discoverClusterNodesAndSlots(jedis);
|
||||
cache.discoverClusterNodesAndSlots(jedis);
|
||||
break;
|
||||
} catch (JedisConnectionException e) {
|
||||
// try next nodes
|
||||
} finally {
|
||||
if (jedis != null) {
|
||||
jedis.close();
|
||||
}
|
||||
if (jedis != null) {
|
||||
jedis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,18 +62,18 @@ public abstract class JedisClusterConnectionHandler {
|
||||
}
|
||||
|
||||
public void renewSlotCache() {
|
||||
for (JedisPool jp : cache.getNodes().values()) {
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
jedis = jp.getResource();
|
||||
cache.discoverClusterSlots(jedis);
|
||||
break;
|
||||
} finally {
|
||||
if (jedis != null) {
|
||||
jedis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
for (JedisPool jp : cache.getNodes().values()) {
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
jedis = jp.getResource();
|
||||
cache.discoverClusterSlots(jedis);
|
||||
break;
|
||||
} finally {
|
||||
if (jedis != null) {
|
||||
jedis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected JedisPool getRandomConnection() {
|
||||
|
||||
@@ -22,162 +22,161 @@ public class JedisClusterInfoCache {
|
||||
private final Lock w = rwl.writeLock();
|
||||
|
||||
public void discoverClusterNodesAndSlots(Jedis jedis) {
|
||||
w.lock();
|
||||
w.lock();
|
||||
|
||||
try {
|
||||
this.nodes.clear();
|
||||
this.slots.clear();
|
||||
try {
|
||||
this.nodes.clear();
|
||||
this.slots.clear();
|
||||
|
||||
String localNodes = jedis.clusterNodes();
|
||||
for (String nodeInfo : localNodes.split("\n")) {
|
||||
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
|
||||
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
|
||||
jedis.getClient().getPort()));
|
||||
String localNodes = jedis.clusterNodes();
|
||||
for (String nodeInfo : localNodes.split("\n")) {
|
||||
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(
|
||||
nodeInfo, new HostAndPort(jedis.getClient().getHost(),
|
||||
jedis.getClient().getPort()));
|
||||
|
||||
HostAndPort targetNode = clusterNodeInfo.getNode();
|
||||
setNodeIfNotExist(targetNode);
|
||||
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
|
||||
}
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
HostAndPort targetNode = clusterNodeInfo.getNode();
|
||||
setNodeIfNotExist(targetNode);
|
||||
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(),
|
||||
targetNode);
|
||||
}
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void discoverClusterSlots(Jedis jedis) {
|
||||
w.lock();
|
||||
w.lock();
|
||||
|
||||
try {
|
||||
this.slots.clear();
|
||||
try {
|
||||
this.slots.clear();
|
||||
|
||||
List<Object> slots = jedis.clusterSlots();
|
||||
List<Object> slots = jedis.clusterSlots();
|
||||
|
||||
for (Object slotInfoObj : slots) {
|
||||
List<Object> slotInfo = (List<Object>) slotInfoObj;
|
||||
for (Object slotInfoObj : slots) {
|
||||
List<Object> slotInfo = (List<Object>) slotInfoObj;
|
||||
|
||||
if (slotInfo.size() <= 2) {
|
||||
continue;
|
||||
}
|
||||
if (slotInfo.size() <= 2) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
|
||||
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
|
||||
|
||||
// hostInfos
|
||||
List<Object> hostInfos = (List<Object>) slotInfo.get(2);
|
||||
if (hostInfos.size() <= 0) {
|
||||
continue;
|
||||
}
|
||||
// hostInfos
|
||||
List<Object> hostInfos = (List<Object>) slotInfo.get(2);
|
||||
if (hostInfos.size() <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// at this time, we just use master, discard slave information
|
||||
HostAndPort targetNode = generateHostAndPort(hostInfos);
|
||||
// at this time, we just use master, discard slave information
|
||||
HostAndPort targetNode = generateHostAndPort(hostInfos);
|
||||
|
||||
setNodeIfNotExist(targetNode);
|
||||
assignSlotsToNode(slotNums, targetNode);
|
||||
}
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
setNodeIfNotExist(targetNode);
|
||||
assignSlotsToNode(slotNums, targetNode);
|
||||
}
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private HostAndPort generateHostAndPort(List<Object> hostInfos) {
|
||||
return new HostAndPort(
|
||||
SafeEncoder.encode((byte[]) hostInfos.get(0)),
|
||||
((Long) hostInfos.get(1)).intValue());
|
||||
return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
|
||||
((Long) hostInfos.get(1)).intValue());
|
||||
}
|
||||
|
||||
public void setNodeIfNotExist(HostAndPort node) {
|
||||
w.lock();
|
||||
try {
|
||||
String nodeKey = getNodeKey(node);
|
||||
if (nodes.containsKey(nodeKey))
|
||||
return;
|
||||
w.lock();
|
||||
try {
|
||||
String nodeKey = getNodeKey(node);
|
||||
if (nodes.containsKey(nodeKey))
|
||||
return;
|
||||
|
||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||
nodes.put(nodeKey, nodePool);
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||
nodes.put(nodeKey, nodePool);
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||
w.lock();
|
||||
try {
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
w.lock();
|
||||
try {
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
slots.put(slot, targetPool);
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
slots.put(slot, targetPool);
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void assignSlotsToNode(List<Integer> targetSlots,
|
||||
HostAndPort targetNode) {
|
||||
w.lock();
|
||||
try {
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
HostAndPort targetNode) {
|
||||
w.lock();
|
||||
try {
|
||||
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
|
||||
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
if (targetPool == null) {
|
||||
setNodeIfNotExist(targetNode);
|
||||
targetPool = nodes.get(getNodeKey(targetNode));
|
||||
}
|
||||
|
||||
for (Integer slot : targetSlots) {
|
||||
slots.put(slot, targetPool);
|
||||
}
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
for (Integer slot : targetSlots) {
|
||||
slots.put(slot, targetPool);
|
||||
}
|
||||
} finally {
|
||||
w.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized JedisPool getNode(String nodeKey) {
|
||||
r.lock();
|
||||
try {
|
||||
return nodes.get(nodeKey);
|
||||
} finally {
|
||||
r.unlock();
|
||||
}
|
||||
r.lock();
|
||||
try {
|
||||
return nodes.get(nodeKey);
|
||||
} finally {
|
||||
r.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized JedisPool getSlotPool(int slot) {
|
||||
r.lock();
|
||||
try {
|
||||
return slots.get(slot);
|
||||
} finally {
|
||||
r.unlock();
|
||||
}
|
||||
r.lock();
|
||||
try {
|
||||
return slots.get(slot);
|
||||
} finally {
|
||||
r.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Map<String, JedisPool> getNodes() {
|
||||
r.lock();
|
||||
try {
|
||||
return new HashMap<String, JedisPool>(nodes);
|
||||
} finally {
|
||||
r.unlock();
|
||||
}
|
||||
r.lock();
|
||||
try {
|
||||
return new HashMap<String, JedisPool>(nodes);
|
||||
} finally {
|
||||
r.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public static String getNodeKey(HostAndPort hnp) {
|
||||
return hnp.getHost() + ":" + hnp.getPort();
|
||||
return hnp.getHost() + ":" + hnp.getPort();
|
||||
}
|
||||
|
||||
public static String getNodeKey(Client client) {
|
||||
return client.getHost() + ":" + client.getPort();
|
||||
return client.getHost() + ":" + client.getPort();
|
||||
}
|
||||
|
||||
public static String getNodeKey(Jedis jedis) {
|
||||
return getNodeKey(jedis.getClient());
|
||||
return getNodeKey(jedis.getClient());
|
||||
}
|
||||
|
||||
private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
|
||||
List<Integer> slotNums = new ArrayList<Integer>();
|
||||
for (int slot = ((Long) slotInfo.get(0)).intValue();
|
||||
slot <= ((Long) slotInfo.get(1)).intValue();
|
||||
slot++) {
|
||||
slotNums.add(slot);
|
||||
}
|
||||
return slotNums;
|
||||
List<Integer> slotNums = new ArrayList<Integer>();
|
||||
for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo
|
||||
.get(1)).intValue(); slot++) {
|
||||
slotNums.add(slot);
|
||||
}
|
||||
return slotNums;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user