From c00807004592abf6868b4cc61f1ccaa690c0fe13 Mon Sep 17 00:00:00 2001 From: Marcos Nils Date: Tue, 10 Dec 2013 10:25:41 -0300 Subject: [PATCH] Add JedisClusterCommand and updated code to use it respectively --- .../redis/clients/jedis/JedisCluster.java | 140 ++++++++++++------ .../clients/jedis/JedisClusterCommand.java | 23 +++ .../jedis/JedisClusterConnectionHandler.java | 33 +++++ .../clients/jedis/tests/JedisClusterTest.java | 4 +- 4 files changed, 156 insertions(+), 44 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/JedisClusterCommand.java create mode 100644 src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index 4586b8f..a3b6f22 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -1,32 +1,25 @@ package redis.clients.jedis; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import redis.clients.jedis.BinaryClient.LIST_POSITION; -import redis.clients.util.Pool; public class JedisCluster implements JedisCommands, BasicCommands { public static final short HASHSLOTS = 16384; private static final int DEFAULT_TIMEOUT = 1; - private Map nodes = new HashMap(); + + private JedisClusterConnectionHandler connectionHandler; public JedisCluster(Set nodes, int timeout) { - initializeSlotsCache(nodes); - } - - private void initializeSlotsCache(Set nodes) { - for (HostAndPort hostAndPort : nodes) { - JedisPool jp = new JedisPool(hostAndPort.getHost(), hostAndPort.getPort()); - this.nodes.put(hostAndPort.getHost()+hostAndPort.getPort(), jp); - } + connectionHandler = new JedisClusterConnectionHandler(nodes); } + + public JedisCluster(Set nodes) { this(nodes, DEFAULT_TIMEOUT); @@ -34,68 +27,133 @@ public class JedisCluster implements JedisCommands, BasicCommands { @Override - public String set(String key, String value) { - return getRandomConnection().set(key, value); + public String set(final String key, final String value) { + return new JedisClusterCommand(connectionHandler) { + @Override + public String execute() { + return connectionHandler.getRandomConnection().set(key, value); + } + }.run(); } @Override - public String get(String key) { - return getRandomConnection().get(key); + public String get(final String key) { + return new JedisClusterCommand(connectionHandler) { + @Override + public String execute() { + return connectionHandler.getRandomConnection().get(key); + } + }.run(); } @Override - public Boolean exists(String key) { - return getRandomConnection().exists(key); + public Boolean exists(final String key) { + return new JedisClusterCommand(connectionHandler) { + @Override + public Boolean execute() { + return connectionHandler.getRandomConnection().exists(key); + } + }.run(); } @Override - public Long persist(String key) { - return getRandomConnection().persist(key); + public Long persist(final String key) { + return new JedisClusterCommand(connectionHandler) { + @Override + public Long execute() { + return connectionHandler.getRandomConnection().persist(key); + } + }.run(); } @Override - public String type(String key) { - return getRandomConnection().type(key); + public String type(final String key) { + return new JedisClusterCommand(connectionHandler) { + @Override + public String execute() { + return connectionHandler.getRandomConnection().type(key); + } + }.run(); } @Override - public Long expire(String key, int seconds) { - return getRandomConnection().expire(key, seconds); + public Long expire(final String key, final int seconds) { + return new JedisClusterCommand(connectionHandler) { + @Override + public Long execute() { + return connectionHandler.getRandomConnection().expire(key, seconds); + } + }.run(); } @Override - public Long expireAt(String key, long unixTime) { - return getRandomConnection().expireAt(key, unixTime); + public Long expireAt(final String key, final long unixTime) { + return new JedisClusterCommand(connectionHandler) { + @Override + public Long execute() { + return connectionHandler.getRandomConnection().expireAt(key, unixTime); + } + }.run(); } @Override - public Long ttl(String key) { - return getRandomConnection().ttl(key); + public Long ttl(final String key) { + return new JedisClusterCommand(connectionHandler) { + @Override + public Long execute() { + return connectionHandler.getRandomConnection().ttl(key); + } + }.run(); } @Override - public Boolean setbit(String key, long offset, boolean value) { - return getRandomConnection().setbit(key, offset, value); + public Boolean setbit(final String key, final long offset, final boolean value) { + return new JedisClusterCommand(connectionHandler) { + @Override + public Boolean execute() { + return connectionHandler.getRandomConnection().setbit(key, offset, value); + } + }.run(); } @Override - public Boolean setbit(String key, long offset, String value) { - return getRandomConnection().setbit(key, offset, value); + public Boolean setbit(final String key, final long offset, final String value) { + return new JedisClusterCommand(connectionHandler) { + @Override + public Boolean execute() { + return connectionHandler.getRandomConnection().setbit(key, offset, value); + } + }.run(); } @Override - public Boolean getbit(String key, long offset) { - return getRandomConnection().getbit(key, offset); + public Boolean getbit(final String key, final long offset) { + return new JedisClusterCommand(connectionHandler) { + @Override + public Boolean execute() { + return connectionHandler.getRandomConnection().getbit(key, offset); + } + }.run(); } @Override - public Long setrange(String key, long offset, String value) { - return getRandomConnection().setrange(key, offset, value); + public Long setrange(final String key, final long offset, final String value) { + return new JedisClusterCommand(connectionHandler) { + @Override + public Long execute() { + return connectionHandler.getRandomConnection().setrange(key, offset, value); + } + }.run(); } @Override - public String getrange(String key, long startOffset, long endOffset) { - return getRandomConnection().getrange(key, startOffset, endOffset); + public String getrange(final String key, final long startOffset, final long endOffset) { + return new JedisClusterCommand(connectionHandler) { + @Override + public String execute() { + return connectionHandler.getRandomConnection().getrange(key, startOffset, endOffset); + } + }.run(); } @Override @@ -728,11 +786,7 @@ public class JedisCluster implements JedisCommands, BasicCommands { } - @SuppressWarnings("unchecked") - private Jedis getRandomConnection() { - Object[] nodeArray = nodes.values().toArray(); - return ((Pool) nodeArray[new Random().nextInt(nodeArray.length)]).getResource(); - } + diff --git a/src/main/java/redis/clients/jedis/JedisClusterCommand.java b/src/main/java/redis/clients/jedis/JedisClusterCommand.java new file mode 100644 index 0000000..55c9f29 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisClusterCommand.java @@ -0,0 +1,23 @@ +package redis.clients.jedis; + +import redis.clients.jedis.exceptions.JedisMovedDataException; + +public abstract class JedisClusterCommand { + + private JedisClusterConnectionHandler connectionHandler; + + public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler) { + this.connectionHandler = connectionHandler; + } + + public abstract T execute(); + + public T run() { + try { + return execute(); + } catch (JedisMovedDataException e) { + //TODO: Retry here + } + return null; + } +} diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java new file mode 100644 index 0000000..d942723 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -0,0 +1,33 @@ +package redis.clients.jedis; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import redis.clients.util.Pool; + +public class JedisClusterConnectionHandler { + + private Map nodes = new HashMap(); + + public JedisClusterConnectionHandler(Set nodes) { + initializeSlotsCache(nodes); + } + + private void initializeSlotsCache(Set nodes) { + for (HostAndPort hostAndPort : nodes) { + JedisPool jp = new JedisPool(hostAndPort.getHost(), + hostAndPort.getPort()); + this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp); + } + + } + + @SuppressWarnings("unchecked") + public Jedis getRandomConnection() { + Object[] nodeArray = nodes.values().toArray(); + return ((Pool) nodeArray[new Random().nextInt(nodeArray.length)]).getResource(); + } + +} diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java index b465be8..204b4f8 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java @@ -52,7 +52,9 @@ public class JedisClusterTest extends Assert { boolean clusterOk = false; while (!clusterOk) { - if (node1.clusterInfo().split("\n")[0].contains("ok")) { + if (node1.clusterInfo().split("\n")[0].contains("ok") && + node2.clusterInfo().split("\n")[0].contains("ok") && + node3.clusterInfo().split("\n")[0].contains("ok") ) { clusterOk = true; } Thread.sleep(100);