Add JedisClusterCommand and updated code to use it respectively
This commit is contained in:
@@ -1,101 +1,159 @@
|
|||||||
package redis.clients.jedis;
|
package redis.clients.jedis;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import redis.clients.jedis.BinaryClient.LIST_POSITION;
|
import redis.clients.jedis.BinaryClient.LIST_POSITION;
|
||||||
import redis.clients.util.Pool;
|
|
||||||
|
|
||||||
public class JedisCluster implements JedisCommands, BasicCommands {
|
public class JedisCluster implements JedisCommands, BasicCommands {
|
||||||
|
|
||||||
public static final short HASHSLOTS = 16384;
|
public static final short HASHSLOTS = 16384;
|
||||||
private static final int DEFAULT_TIMEOUT = 1;
|
private static final int DEFAULT_TIMEOUT = 1;
|
||||||
|
|
||||||
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
|
||||||
|
private JedisClusterConnectionHandler connectionHandler;
|
||||||
|
|
||||||
public JedisCluster(Set<HostAndPort> nodes, int timeout) {
|
public JedisCluster(Set<HostAndPort> nodes, int timeout) {
|
||||||
initializeSlotsCache(nodes);
|
connectionHandler = new JedisClusterConnectionHandler(nodes);
|
||||||
}
|
|
||||||
|
|
||||||
private void initializeSlotsCache(Set<HostAndPort> nodes) {
|
|
||||||
for (HostAndPort hostAndPort : nodes) {
|
|
||||||
JedisPool jp = new JedisPool(hostAndPort.getHost(), hostAndPort.getPort());
|
|
||||||
this.nodes.put(hostAndPort.getHost()+hostAndPort.getPort(), jp);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public JedisCluster(Set<HostAndPort> nodes) {
|
public JedisCluster(Set<HostAndPort> nodes) {
|
||||||
this(nodes, DEFAULT_TIMEOUT);
|
this(nodes, DEFAULT_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String set(String key, String value) {
|
public String set(final String key, final String value) {
|
||||||
return getRandomConnection().set(key, value);
|
return new JedisClusterCommand<String>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public String execute() {
|
||||||
|
return connectionHandler.getRandomConnection().set(key, value);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String get(String key) {
|
public String get(final String key) {
|
||||||
return getRandomConnection().get(key);
|
return new JedisClusterCommand<String>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public String execute() {
|
||||||
|
return connectionHandler.getRandomConnection().get(key);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Boolean exists(String key) {
|
public Boolean exists(final String key) {
|
||||||
return getRandomConnection().exists(key);
|
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public Boolean execute() {
|
||||||
|
return connectionHandler.getRandomConnection().exists(key);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long persist(String key) {
|
public Long persist(final String key) {
|
||||||
return getRandomConnection().persist(key);
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public Long execute() {
|
||||||
|
return connectionHandler.getRandomConnection().persist(key);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String type(String key) {
|
public String type(final String key) {
|
||||||
return getRandomConnection().type(key);
|
return new JedisClusterCommand<String>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public String execute() {
|
||||||
|
return connectionHandler.getRandomConnection().type(key);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long expire(String key, int seconds) {
|
public Long expire(final String key, final int seconds) {
|
||||||
return getRandomConnection().expire(key, seconds);
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public Long execute() {
|
||||||
|
return connectionHandler.getRandomConnection().expire(key, seconds);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long expireAt(String key, long unixTime) {
|
public Long expireAt(final String key, final long unixTime) {
|
||||||
return getRandomConnection().expireAt(key, unixTime);
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public Long execute() {
|
||||||
|
return connectionHandler.getRandomConnection().expireAt(key, unixTime);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long ttl(String key) {
|
public Long ttl(final String key) {
|
||||||
return getRandomConnection().ttl(key);
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public Long execute() {
|
||||||
|
return connectionHandler.getRandomConnection().ttl(key);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Boolean setbit(String key, long offset, boolean value) {
|
public Boolean setbit(final String key, final long offset, final boolean value) {
|
||||||
return getRandomConnection().setbit(key, offset, value);
|
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public Boolean execute() {
|
||||||
|
return connectionHandler.getRandomConnection().setbit(key, offset, value);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Boolean setbit(String key, long offset, String value) {
|
public Boolean setbit(final String key, final long offset, final String value) {
|
||||||
return getRandomConnection().setbit(key, offset, value);
|
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public Boolean execute() {
|
||||||
|
return connectionHandler.getRandomConnection().setbit(key, offset, value);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Boolean getbit(String key, long offset) {
|
public Boolean getbit(final String key, final long offset) {
|
||||||
return getRandomConnection().getbit(key, offset);
|
return new JedisClusterCommand<Boolean>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public Boolean execute() {
|
||||||
|
return connectionHandler.getRandomConnection().getbit(key, offset);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long setrange(String key, long offset, String value) {
|
public Long setrange(final String key, final long offset, final String value) {
|
||||||
return getRandomConnection().setrange(key, offset, value);
|
return new JedisClusterCommand<Long>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public Long execute() {
|
||||||
|
return connectionHandler.getRandomConnection().setrange(key, offset, value);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getrange(String key, long startOffset, long endOffset) {
|
public String getrange(final String key, final long startOffset, final long endOffset) {
|
||||||
return getRandomConnection().getrange(key, startOffset, endOffset);
|
return new JedisClusterCommand<String>(connectionHandler) {
|
||||||
|
@Override
|
||||||
|
public String execute() {
|
||||||
|
return connectionHandler.getRandomConnection().getrange(key, startOffset, endOffset);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -728,11 +786,7 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private Jedis getRandomConnection() {
|
|
||||||
Object[] nodeArray = nodes.values().toArray();
|
|
||||||
return ((Pool<Jedis>) nodeArray[new Random().nextInt(nodeArray.length)]).getResource();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
23
src/main/java/redis/clients/jedis/JedisClusterCommand.java
Normal file
23
src/main/java/redis/clients/jedis/JedisClusterCommand.java
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
package redis.clients.jedis;
|
||||||
|
|
||||||
|
import redis.clients.jedis.exceptions.JedisMovedDataException;
|
||||||
|
|
||||||
|
public abstract class JedisClusterCommand<T> {
|
||||||
|
|
||||||
|
private JedisClusterConnectionHandler connectionHandler;
|
||||||
|
|
||||||
|
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler) {
|
||||||
|
this.connectionHandler = connectionHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract T execute();
|
||||||
|
|
||||||
|
public T run() {
|
||||||
|
try {
|
||||||
|
return execute();
|
||||||
|
} catch (JedisMovedDataException e) {
|
||||||
|
//TODO: Retry here
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
package redis.clients.jedis;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import redis.clients.util.Pool;
|
||||||
|
|
||||||
|
public class JedisClusterConnectionHandler {
|
||||||
|
|
||||||
|
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||||
|
|
||||||
|
public JedisClusterConnectionHandler(Set<HostAndPort> nodes) {
|
||||||
|
initializeSlotsCache(nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeSlotsCache(Set<HostAndPort> nodes) {
|
||||||
|
for (HostAndPort hostAndPort : nodes) {
|
||||||
|
JedisPool jp = new JedisPool(hostAndPort.getHost(),
|
||||||
|
hostAndPort.getPort());
|
||||||
|
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public Jedis getRandomConnection() {
|
||||||
|
Object[] nodeArray = nodes.values().toArray();
|
||||||
|
return ((Pool<Jedis>) nodeArray[new Random().nextInt(nodeArray.length)]).getResource();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -52,7 +52,9 @@ public class JedisClusterTest extends Assert {
|
|||||||
|
|
||||||
boolean clusterOk = false;
|
boolean clusterOk = false;
|
||||||
while (!clusterOk) {
|
while (!clusterOk) {
|
||||||
if (node1.clusterInfo().split("\n")[0].contains("ok")) {
|
if (node1.clusterInfo().split("\n")[0].contains("ok") &&
|
||||||
|
node2.clusterInfo().split("\n")[0].contains("ok") &&
|
||||||
|
node3.clusterInfo().split("\n")[0].contains("ok") ) {
|
||||||
clusterOk = true;
|
clusterOk = true;
|
||||||
}
|
}
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|||||||
Reference in New Issue
Block a user