Merge pull request #554 from xetorthio/jediscluster_multithread
Make JedisCluster multihread by improving connection handling
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -23,9 +23,10 @@ public abstract class JedisClusterCommand<T> {
|
|||||||
this.redirections = maxRedirections;
|
this.redirections = maxRedirections;
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract T execute();
|
public abstract T execute(Jedis connection);
|
||||||
|
|
||||||
public T run(String key) {
|
public T run(String key) {
|
||||||
|
Jedis connection = null;
|
||||||
try {
|
try {
|
||||||
|
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
@@ -35,16 +36,20 @@ public abstract class JedisClusterCommand<T> {
|
|||||||
throw new JedisClusterMaxRedirectionsException(
|
throw new JedisClusterMaxRedirectionsException(
|
||||||
"Too many Cluster redirections?");
|
"Too many Cluster redirections?");
|
||||||
}
|
}
|
||||||
connectionHandler.getConnectionFromSlot(JedisClusterCRC16
|
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16
|
||||||
.getSlot(key));
|
.getSlot(key));
|
||||||
if (asking) {
|
if (asking) {
|
||||||
// TODO: Pipeline asking with the original command to make it
|
// TODO: Pipeline asking with the original command to make it
|
||||||
// faster....
|
// faster....
|
||||||
connectionHandler.getConnection().asking();
|
connection.asking();
|
||||||
}
|
}
|
||||||
return execute();
|
return execute(connection);
|
||||||
} catch (JedisRedirectionException jre) {
|
} catch (JedisRedirectionException jre) {
|
||||||
return handleRedirection(jre, key);
|
return handleRedirection(jre, key);
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connectionHandler.returnConnection(connection);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,6 +12,13 @@ public abstract class JedisClusterConnectionHandler {
|
|||||||
|
|
||||||
abstract Jedis getConnection();
|
abstract Jedis getConnection();
|
||||||
|
|
||||||
|
protected void returnConnection(Jedis connection) {
|
||||||
|
nodes.get(
|
||||||
|
connection.getClient().getHost()
|
||||||
|
+ connection.getClient().getPort()).returnResource(
|
||||||
|
connection);
|
||||||
|
}
|
||||||
|
|
||||||
abstract Jedis getConnectionFromSlot(int slot);
|
abstract Jedis getConnectionFromSlot(int slot);
|
||||||
|
|
||||||
public JedisClusterConnectionHandler(Set<HostAndPort> nodes) {
|
public JedisClusterConnectionHandler(Set<HostAndPort> nodes) {
|
||||||
|
|||||||
@@ -5,42 +5,26 @@ import java.util.Set;
|
|||||||
public class JedisSlotBasedConnectionHandler extends
|
public class JedisSlotBasedConnectionHandler extends
|
||||||
JedisClusterConnectionHandler {
|
JedisClusterConnectionHandler {
|
||||||
|
|
||||||
private Jedis currentConnection;
|
|
||||||
|
|
||||||
public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes) {
|
public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes) {
|
||||||
super(nodes);
|
super(nodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Jedis getConnection() {
|
public Jedis getConnection() {
|
||||||
return currentConnection != null ? currentConnection
|
return getRandomConnection().getResource();
|
||||||
: getRandomConnection().getResource();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void returnCurrentConnection() {
|
|
||||||
if (currentConnection != null) {
|
|
||||||
nodes.get(
|
|
||||||
currentConnection.getClient().getHost()
|
|
||||||
+ currentConnection.getClient().getPort())
|
|
||||||
.returnResource(currentConnection);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
public void assignSlotToNode(int slot, HostAndPort targetNode) {
|
||||||
super.assignSlotToNode(slot, targetNode);
|
super.assignSlotToNode(slot, targetNode);
|
||||||
getConnectionFromSlot(slot);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Jedis getConnectionFromSlot(int slot) {
|
public Jedis getConnectionFromSlot(int slot) {
|
||||||
returnCurrentConnection();
|
|
||||||
JedisPool connectionPool = slots.get(slot);
|
JedisPool connectionPool = slots.get(slot);
|
||||||
if (connectionPool == null) {
|
if (connectionPool == null) {
|
||||||
connectionPool = getRandomConnection();
|
connectionPool = getRandomConnection();
|
||||||
}
|
}
|
||||||
currentConnection = connectionPool.getResource();
|
return connectionPool.getResource();
|
||||||
return currentConnection;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package redis.clients.util;
|
|||||||
|
|
||||||
public class JedisClusterCRC16 {
|
public class JedisClusterCRC16 {
|
||||||
public final static int polynomial = 0x1021; // Represents x^16+x^12+x^5+1
|
public final static int polynomial = 0x1021; // Represents x^16+x^12+x^5+1
|
||||||
static int crc;
|
|
||||||
|
|
||||||
|
|
||||||
public static int getSlot(String key) {
|
public static int getSlot(String key) {
|
||||||
@@ -17,7 +16,7 @@ public class JedisClusterCRC16 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static int getCRC16(String key) {
|
private static int getCRC16(String key) {
|
||||||
crc = 0x0000;
|
int crc = 0x0000;
|
||||||
for (byte b : key.getBytes()) {
|
for (byte b : key.getBytes()) {
|
||||||
for (int i = 0; i < 8; i++) {
|
for (int i = 0; i < 8; i++) {
|
||||||
boolean bit = ((b >> (7 - i) & 1) == 1);
|
boolean bit = ((b >> (7 - i) & 1) == 1);
|
||||||
|
|||||||
Reference in New Issue
Block a user