Make JedisCluster multihread by improving connection handling
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -23,9 +23,10 @@ public abstract class JedisClusterCommand<T> {
|
||||
this.redirections = maxRedirections;
|
||||
}
|
||||
|
||||
public abstract T execute();
|
||||
public abstract T execute(Jedis connection);
|
||||
|
||||
public T run(String key) {
|
||||
Jedis connection = null;
|
||||
try {
|
||||
|
||||
if (key == null) {
|
||||
@@ -35,16 +36,20 @@ public abstract class JedisClusterCommand<T> {
|
||||
throw new JedisClusterMaxRedirectionsException(
|
||||
"Too many Cluster redirections?");
|
||||
}
|
||||
connectionHandler.getConnectionFromSlot(JedisClusterCRC16
|
||||
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16
|
||||
.getSlot(key));
|
||||
if (asking) {
|
||||
// TODO: Pipeline asking with the original command to make it
|
||||
// faster....
|
||||
connectionHandler.getConnection().asking();
|
||||
connection.asking();
|
||||
}
|
||||
return execute();
|
||||
return execute(connection);
|
||||
} catch (JedisRedirectionException jre) {
|
||||
return handleRedirection(jre, key);
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
connectionHandler.returnConnection(connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,13 @@ public abstract class JedisClusterConnectionHandler {
|
||||
protected Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
|
||||
|
||||
abstract Jedis getConnection();
|
||||
|
||||
protected void returnConnection(Jedis connection) {
|
||||
nodes.get(
|
||||
connection.getClient().getHost()
|
||||
+ connection.getClient().getPort()).returnResource(
|
||||
connection);
|
||||
}
|
||||
|
||||
abstract Jedis getConnectionFromSlot(int slot);
|
||||
|
||||
|
||||
@@ -5,25 +5,12 @@ import java.util.Set;
|
||||
public class JedisSlotBasedConnectionHandler extends
|
||||
JedisClusterConnectionHandler {
|
||||
|
||||
private Jedis currentConnection;
|
||||
|
||||
public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes) {
|
||||
super(nodes);
|
||||
}
|
||||
|
||||
public Jedis getConnection() {
|
||||
return currentConnection != null ? currentConnection
|
||||
: getRandomConnection().getResource();
|
||||
}
|
||||
|
||||
private void returnCurrentConnection() {
|
||||
if (currentConnection != null) {
|
||||
nodes.get(
|
||||
currentConnection.getClient().getHost()
|
||||
+ currentConnection.getClient().getPort())
|
||||
.returnResource(currentConnection);
|
||||
}
|
||||
|
||||
return getRandomConnection().getResource();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -34,13 +21,11 @@ public class JedisSlotBasedConnectionHandler extends
|
||||
|
||||
@Override
|
||||
public Jedis getConnectionFromSlot(int slot) {
|
||||
returnCurrentConnection();
|
||||
JedisPool connectionPool = slots.get(slot);
|
||||
if (connectionPool == null) {
|
||||
connectionPool = getRandomConnection();
|
||||
}
|
||||
currentConnection = connectionPool.getResource();
|
||||
return connectionPool.getResource();
|
||||
return connectionPool.getResource();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user