From 9f767a084859fad71a6ab972973d2f73f3ae1098 Mon Sep 17 00:00:00 2001 From: Marcos Nils Date: Fri, 27 Dec 2013 23:09:44 -0300 Subject: [PATCH] Add automatic discovery of cluster nodes --- .../redis/clients/jedis/JedisCluster.java | 6 +++ .../jedis/JedisClusterConnectionHandler.java | 43 ++++++++++++++++++- .../jedis/JedisRandomConnectionHandler.java | 15 +------ .../clients/jedis/tests/JedisClusterTest.java | 42 ++++++++++++++---- 4 files changed, 82 insertions(+), 24 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index 460e032..9753815 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -1,5 +1,7 @@ package redis.clients.jedis; +import java.util.HashSet; +import java.util.IllegalFormatException; import java.util.List; import java.util.Map; import java.util.Set; @@ -1192,4 +1194,8 @@ public class JedisCluster implements JedisCommands, BasicCommands { } }.run(); } + + public Map getClusterNodes() { + return connectionHandler.getNodes(); + } } diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java index d5aaf51..a28a23c 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -1,8 +1,47 @@ package redis.clients.jedis; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; -public interface JedisClusterConnectionHandler { + +public abstract class JedisClusterConnectionHandler { - Jedis getConnection(); + protected Map nodes = new HashMap(); + + abstract Jedis getConnection(); + + public JedisClusterConnectionHandler(Set nodes) { + initializeSlotsCache(nodes); + } + + public Map getNodes() { + return nodes; + } + + private void initializeSlotsCache(Set nodes) { + for (HostAndPort hostAndPort : nodes) { + JedisPool jp = new JedisPool(hostAndPort.getHost(), hostAndPort.getPort()); + this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp); + this.nodes.putAll(discoverClusterNodes(jp)); + } + } + + private Map discoverClusterNodes(JedisPool jp) { + Map discoveredNodes = new HashMap(); + String localNodes = jp.getResource().clusterNodes(); + for (String nodeInfo : localNodes.split("\n")) { + HostAndPort node = getHostAndPortFromNodeLine(nodeInfo); + JedisPool nodePool = new JedisPool(node.getHost(), node.getPort()); + discoveredNodes.put(node.getHost() + node.getPort(), nodePool); + } + return discoveredNodes; + } + + private HostAndPort getHostAndPortFromNodeLine(String nodeInfo) { + String stringHostAndPort = nodeInfo.split(" ",3)[1]; + String[] arrayHostAndPort = stringHostAndPort.split(":"); + return new HostAndPort(arrayHostAndPort[0], Integer.valueOf(arrayHostAndPort[1])); + } } diff --git a/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java index f3da675..fc8226b 100644 --- a/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java @@ -1,28 +1,17 @@ package redis.clients.jedis; -import java.util.HashMap; -import java.util.Map; import java.util.Random; import java.util.Set; import redis.clients.util.Pool; -public class JedisRandomConnectionHandler implements JedisClusterConnectionHandler { +public class JedisRandomConnectionHandler extends JedisClusterConnectionHandler { - private Map nodes = new HashMap(); public JedisRandomConnectionHandler(Set nodes) { - initializeSlotsCache(nodes); + super(nodes); } - private void initializeSlotsCache(Set nodes) { - for (HostAndPort hostAndPort : nodes) { - JedisPool jp = new JedisPool(hostAndPort.getHost(), - hostAndPort.getPort()); - this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp); - } - - } @SuppressWarnings("unchecked") public Jedis getConnection() { diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java index 29f275d..5162635 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java @@ -1,5 +1,8 @@ package redis.clients.jedis.tests; +import java.util.HashSet; +import java.util.Set; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -43,12 +46,24 @@ public class JedisClusterTest extends Assert { node1.clusterMeet("127.0.0.1", nodeInfo2.getPort()); node1.clusterMeet("127.0.0.1", nodeInfo3.getPort()); - // add all slots to node1 - Pipeline pipelined = node1.pipelined(); + // split available slots across the three nodes + int slotsPerNode = JedisCluster.HASHSLOTS / 3; + Pipeline pipeline1 = node1.pipelined(); + Pipeline pipeline2 = node2.pipelined(); + Pipeline pipeline3 = node3.pipelined(); for (int i = 0; i < JedisCluster.HASHSLOTS; i++) { - pipelined.clusterAddSlots(i); + if (i < slotsPerNode) { + pipeline1.clusterAddSlots(i); + } else if (i > slotsPerNode * 2) { + pipeline3.clusterAddSlots(i); + } else { + pipeline2.clusterAddSlots(i); + } } - pipelined.sync(); + pipeline1.sync(); + pipeline2.sync(); + pipeline3.sync(); + boolean clusterOk = false; while (!clusterOk) { @@ -72,19 +87,28 @@ public class JedisClusterTest extends Assert { } @Test(expected=JedisMovedDataException.class) - public void throwMovedExceptionTest() { + public void testThrowMovedException() { node1.set("foo", "bar"); node2.get("foo"); } @Test(expected=JedisAskDataException.class) - public void throwAskExceptionTest() { + public void testThrowAskException() { int keySlot = JedisClusterCRC16.getSlot("test"); - String node2Id = getNodeId(node2.clusterNodes()); - node1.clusterSetSlotMigrating(keySlot, node2Id); - node1.get("test"); + String node3Id = getNodeId(node3.clusterNodes()); + node2.clusterSetSlotMigrating(keySlot, node3Id); + node2.get("test"); } + @Test + public void testDiscoverNodesAutomatically() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + JedisCluster jc = new JedisCluster(jedisClusterNode); + assertEquals(jc.getClusterNodes().size(), 3); + } + + private String getNodeId(String infoOutput) { for (String infoLine : infoOutput.split("\n")) { if (infoLine.contains("myself")) {