Merging upstream

This commit is contained in:
Mayank Dang
2014-03-26 13:27:47 +05:30
14 changed files with 513 additions and 319 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -3,19 +3,18 @@ package redis.clients.jedis;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
public abstract class JedisClusterCommand<T> {
private boolean asking = false;
private JedisClusterConnectionHandler connectionHandler;
private int commandTimeout;
private int redirections;
// private boolean asking = false;
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler,
int timeout, int maxRedirections) {
this.connectionHandler = connectionHandler;
@@ -23,38 +22,83 @@ public abstract class JedisClusterCommand<T> {
this.redirections = maxRedirections;
}
public abstract T execute();
public abstract T execute(Jedis connection);
public T run(String key) {
try {
if (key == null) {
throw new JedisClusterException(
"No way to dispatch this command to Redis Cluster.");
}
if (key == null) {
throw new JedisClusterException(
"No way to dispatch this command to Redis Cluster.");
} else if (redirections == 0) {
throw new JedisClusterMaxRedirectionsException(
"Too many Cluster redirections?");
return runWithRetries(key, this.redirections, false, false);
}
private T runWithRetries(String key, int redirections,
boolean tryRandomNode, boolean asking) {
if (redirections <= 0) {
throw new JedisClusterMaxRedirectionsException(
"Too many Cluster redirections?");
}
Jedis connection = null;
try {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler
.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
connectionHandler.getConnectionFromSlot(JedisClusterCRC16
.getSlot(key));
if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connectionHandler.getConnection().asking();
connection.asking();
// if asking success, reset asking flag
asking = false;
}
return execute();
return execute(connection);
} catch (JedisConnectionException jce) {
if (tryRandomNode) {
// maybe all connection is down
throw jce;
}
releaseConnection(connection, true);
connection = null;
// retry with random connection
return runWithRetries(key, redirections--, true, asking);
} catch (JedisRedirectionException jre) {
return handleRedirection(jre, key);
if (jre instanceof JedisAskDataException) {
asking = true;
} else if (jre instanceof JedisMovedDataException) {
// TODO : In antirez's redis-rb-cluster implementation,
// it rebuilds cluster's slot and node cache
}
this.connectionHandler.assignSlotToNode(jre.getSlot(),
jre.getTargetNode());
releaseConnection(connection, false);
connection = null;
return runWithRetries(key, redirections - 1, false, asking);
} finally {
releaseConnection(connection, false);
}
}
private void releaseConnection(Jedis connection, boolean broken) {
if (connection != null) {
if (broken) {
connectionHandler.returnBrokenConnection(connection);
} else {
connectionHandler.returnConnection(connection);
}
}
}
private T handleRedirection(JedisRedirectionException jre, String key) {
if (jre instanceof JedisAskDataException) {
asking = true;
}
redirections--;
this.connectionHandler.assignSlotToNode(jre.getSlot(),
jre.getTargetNode());
return run(key);
}
}

View File

@@ -7,6 +7,8 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import redis.clients.jedis.exceptions.JedisConnectionException;
public abstract class JedisClusterConnectionHandler {
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
@@ -15,6 +17,16 @@ public abstract class JedisClusterConnectionHandler {
abstract Jedis getConnection();
protected void returnConnection(Jedis connection) {
nodes.get(getNodeKey(connection.getClient()))
.returnResource(connection);
}
public void returnBrokenConnection(Jedis connection) {
nodes.get(getNodeKey(connection.getClient())).returnBrokenResource(
connection);
}
abstract Jedis getConnectionFromSlot(int slot);
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig) {
@@ -26,29 +38,57 @@ public abstract class JedisClusterConnectionHandler {
return nodes;
}
private void initializeSlotsCache(Set<HostAndPort> nodes) {
for (HostAndPort hostAndPort : nodes) {
private void initializeSlotsCache(Set<HostAndPort> startNodes) {
for (HostAndPort hostAndPort : startNodes) {
JedisPool jp = new JedisPool(poolConfig, hostAndPort.getHost(),
hostAndPort.getPort());
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
Jedis jedis = jp.getResource();
this.nodes.clear();
this.slots.clear();
Jedis jedis = null;
try {
jedis = jp.getResource();
discoverClusterNodesAndSlots(jedis);
break;
} catch (JedisConnectionException e) {
if (jedis != null) {
jp.returnBrokenResource(jedis);
jedis = null;
}
// try next nodes
} finally {
jp.returnResource(jedis);
if (jedis != null) {
jp.returnResource(jedis);
}
}
}
}
for (HostAndPort node : startNodes) {
setNodeIfNotExist(node);
}
}
private void discoverClusterNodesAndSlots(Jedis jedis) {
String localNodes = jedis.clusterNodes();
for (String nodeInfo : localNodes.split("\n")) {
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo, jedis);
JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort());
this.nodes.put(node.getHost() + node.getPort(), nodePool);
setNodeIfNotExist(node);
JedisPool nodePool = nodes.get(getNodeKey(node));
populateNodeSlots(nodeInfo, nodePool);
}
}
private void setNodeIfNotExist(HostAndPort node) {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey))
return;
JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort());
nodes.put(nodeKey, nodePool);
}
private void populateNodeSlots(String nodeInfo, JedisPool nodePool) {
String[] nodeInfoArray = nodeInfo.split(" ");
@@ -71,7 +111,8 @@ public abstract class JedisClusterConnectionHandler {
}
}
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo, Jedis currentConnection) {
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo,
Jedis currentConnection) {
String stringHostAndPort = nodeInfo.split(" ", 3)[1];
if (":0".equals(stringHostAndPort)) {
return new HostAndPort(currentConnection.getClient().getHost(),
@@ -83,9 +124,16 @@ public abstract class JedisClusterConnectionHandler {
}
public void assignSlotToNode(int slot, HostAndPort targetNode) {
JedisPool targetPool = nodes.get(targetNode.getHost()
+ targetNode.getPort());
slots.put(slot, targetPool);
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool != null) {
slots.put(slot, targetPool);
} else {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
slots.put(slot, targetPool);
}
}
protected JedisPool getRandomConnection() {
@@ -93,4 +141,11 @@ public abstract class JedisClusterConnectionHandler {
return (JedisPool) (nodeArray[new Random().nextInt(nodeArray.length)]);
}
protected String getNodeKey(HostAndPort hnp) {
return hnp.getHost() + ":" + hnp.getPort();
}
protected String getNodeKey(Client client) {
return client.getHost() + ":" + client.getPort();
}
}

View File

@@ -80,11 +80,15 @@ public class JedisPool extends Pool<Jedis> {
}
public void returnBrokenResource(final Jedis resource) {
returnBrokenResourceObject(resource);
if (resource != null) {
returnBrokenResourceObject(resource);
}
}
public void returnResource(final Jedis resource) {
resource.resetState();
returnResourceObject(resource);
if (resource != null) {
resource.resetState();
returnResourceObject(resource);
}
}
}

View File

@@ -1,49 +1,74 @@
package redis.clients.jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
public class JedisSlotBasedConnectionHandler extends
JedisClusterConnectionHandler {
private Jedis currentConnection;
public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig) {
super(nodes, poolConfig);
}
public Jedis getConnection() {
return currentConnection != null ? currentConnection
: getRandomConnection().getResource();
}
// In antirez's redis-rb-cluster implementation,
// getRandomConnection always return valid connection (able to ping-pong)
// or exception if all connections are invalid
List<JedisPool> pools = getShuffledNodesPool();
for (JedisPool pool : pools) {
Jedis jedis = null;
try {
jedis = pool.getResource();
if (jedis == null) {
continue;
}
String result = jedis.ping();
if (result.equalsIgnoreCase("pong"))
return jedis;
private void returnCurrentConnection() {
if (currentConnection != null) {
nodes.get(
currentConnection.getClient().getHost()
+ currentConnection.getClient().getPort())
.returnResource(currentConnection);
pool.returnBrokenResource(jedis);
} catch (JedisConnectionException ex) {
if (jedis != null) {
pool.returnBrokenResource(jedis);
}
}
}
throw new JedisConnectionException("no reachable node in cluster");
}
@Override
public void assignSlotToNode(int slot, HostAndPort targetNode) {
super.assignSlotToNode(slot, targetNode);
getConnectionFromSlot(slot);
}
@Override
public Jedis getConnectionFromSlot(int slot) {
returnCurrentConnection();
JedisPool connectionPool = slots.get(slot);
if (connectionPool == null) {
connectionPool = getRandomConnection();
if (connectionPool != null) {
// It can't guaranteed to get valid connection because of node assignment
return connectionPool.getResource();
} else {
return getConnection();
}
currentConnection = connectionPool.getResource();
return currentConnection;
}
private List<JedisPool> getShuffledNodesPool() {
List<JedisPool> pools = new ArrayList<JedisPool>();
pools.addAll(nodes.values());
Collections.shuffle(pools);
return pools;
}
}

View File

@@ -31,6 +31,12 @@ public class Pipeline extends MultiKeyPipelineBase {
return values;
}
public void setResponseDependency(Response<?> dependency) {
for (Response<?> response : responses) {
response.setDependency(dependency);
}
}
public void addResponse(Response<?> response) {
responses.add(response);
}
@@ -106,6 +112,7 @@ public class Pipeline extends MultiKeyPipelineBase {
public Response<List<Object>> exec() {
client.exec();
Response<List<Object>> response = super.getResponse(currentMulti);
currentMulti.setResponseDependency(response);
currentMulti = null;
return response;
}

View File

@@ -5,6 +5,7 @@ import java.util.ArrayList;
import java.util.List;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
@@ -16,6 +17,7 @@ public final class Protocol {
private static final String ASK_RESPONSE = "ASK";
private static final String MOVED_RESPONSE = "MOVED";
private static final String CLUSTERDOWN_RESPONSE = "CLUSTERDOWN";
public static final int DEFAULT_PORT = 6379;
public static final int DEFAULT_SENTINEL_PORT = 26379;
public static final int DEFAULT_TIMEOUT = 2000;
@@ -96,6 +98,8 @@ public final class Protocol {
throw new JedisAskDataException(message, new HostAndPort(
askInfo[1], Integer.valueOf(askInfo[2])),
Integer.valueOf(askInfo[0]));
} else if (message.startsWith(CLUSTERDOWN_RESPONSE)) {
throw new JedisClusterException(message);
}
throw new JedisDataException(message);
}

View File

@@ -8,6 +8,8 @@ public class Response<T> {
private boolean set = false;
private Builder<T> builder;
private Object data;
private Response<?> dependency = null;
private boolean requestDependencyBuild = false;
public Response(Builder<T> b) {
this.builder = b;
@@ -19,23 +21,39 @@ public class Response<T> {
}
public T get() {
// if response has dependency response and dependency is not built,
// build it first and no more!!
if (!requestDependencyBuild && dependency != null && dependency.set
&& !dependency.built) {
requestDependencyBuild = true;
dependency.build();
}
if (!set) {
throw new JedisDataException(
"Please close pipeline or multi block before calling this method.");
}
if (!built) {
if (data != null) {
if (data instanceof JedisDataException) {
throw new JedisDataException((JedisDataException) data);
}
response = builder.build(data);
}
this.data = null;
built = true;
build();
}
return response;
}
public void setDependency(Response<?> dependency) {
this.dependency = dependency;
this.requestDependencyBuild = false;
}
private void build() {
if (data != null) {
if (data instanceof JedisDataException) {
throw new JedisDataException((JedisDataException) data);
}
response = builder.build(data);
}
data = null;
built = true;
}
public String toString() {
return "Response " + builder.toString();
}

View File

@@ -2,7 +2,6 @@ package redis.clients.util;
public class JedisClusterCRC16 {
public final static int polynomial = 0x1021; // Represents x^16+x^12+x^5+1
static int crc;
public static int getSlot(String key) {
@@ -17,7 +16,7 @@ public class JedisClusterCRC16 {
}
private static int getCRC16(String key) {
crc = 0x0000;
int crc = 0x0000;
for (byte b : key.getBytes()) {
for (int i = 0; i < 8; i++) {
boolean bit = ((b >> (7 - i) & 1) == 1);

View File

@@ -45,6 +45,9 @@ public abstract class Pool<T> {
}
public void returnResourceObject(final T resource) {
if (resource == null) {
return;
}
try {
internalPool.returnObject(resource);
} catch (Exception e) {
@@ -54,11 +57,15 @@ public abstract class Pool<T> {
}
public void returnBrokenResource(final T resource) {
returnBrokenResourceObject(resource);
if (resource != null) {
returnBrokenResourceObject(resource);
}
}
public void returnResource(final T resource) {
returnResourceObject(resource);
if (resource != null) {
returnResourceObject(resource);
}
}
public void destroy() {
@@ -81,4 +88,4 @@ public abstract class Pool<T> {
throw new JedisException("Could not destroy the pool", e);
}
}
}
}