Add automatic discovery of cluster nodes
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.IllegalFormatException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@@ -1192,4 +1194,8 @@ public class JedisCluster implements JedisCommands, BasicCommands {
|
||||
}
|
||||
}.run();
|
||||
}
|
||||
|
||||
public Map<String, JedisPool> getClusterNodes() {
|
||||
return connectionHandler.getNodes();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,47 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public interface JedisClusterConnectionHandler {
|
||||
|
||||
public abstract class JedisClusterConnectionHandler {
|
||||
|
||||
Jedis getConnection();
|
||||
protected Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||
|
||||
abstract Jedis getConnection();
|
||||
|
||||
public JedisClusterConnectionHandler(Set<HostAndPort> nodes) {
|
||||
initializeSlotsCache(nodes);
|
||||
}
|
||||
|
||||
public Map<String, JedisPool> getNodes() {
|
||||
return nodes;
|
||||
}
|
||||
|
||||
private void initializeSlotsCache(Set<HostAndPort> nodes) {
|
||||
for (HostAndPort hostAndPort : nodes) {
|
||||
JedisPool jp = new JedisPool(hostAndPort.getHost(), hostAndPort.getPort());
|
||||
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
|
||||
this.nodes.putAll(discoverClusterNodes(jp));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Map<? extends String, ? extends JedisPool> discoverClusterNodes(JedisPool jp) {
|
||||
Map<String, JedisPool> discoveredNodes = new HashMap<String, JedisPool>();
|
||||
String localNodes = jp.getResource().clusterNodes();
|
||||
for (String nodeInfo : localNodes.split("\n")) {
|
||||
HostAndPort node = getHostAndPortFromNodeLine(nodeInfo);
|
||||
JedisPool nodePool = new JedisPool(node.getHost(), node.getPort());
|
||||
discoveredNodes.put(node.getHost() + node.getPort(), nodePool);
|
||||
}
|
||||
return discoveredNodes;
|
||||
}
|
||||
|
||||
private HostAndPort getHostAndPortFromNodeLine(String nodeInfo) {
|
||||
String stringHostAndPort = nodeInfo.split(" ",3)[1];
|
||||
String[] arrayHostAndPort = stringHostAndPort.split(":");
|
||||
return new HostAndPort(arrayHostAndPort[0], Integer.valueOf(arrayHostAndPort[1]));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,28 +1,17 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import redis.clients.util.Pool;
|
||||
|
||||
public class JedisRandomConnectionHandler implements JedisClusterConnectionHandler {
|
||||
public class JedisRandomConnectionHandler extends JedisClusterConnectionHandler {
|
||||
|
||||
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
|
||||
|
||||
public JedisRandomConnectionHandler(Set<HostAndPort> nodes) {
|
||||
initializeSlotsCache(nodes);
|
||||
super(nodes);
|
||||
}
|
||||
|
||||
private void initializeSlotsCache(Set<HostAndPort> nodes) {
|
||||
for (HostAndPort hostAndPort : nodes) {
|
||||
JedisPool jp = new JedisPool(hostAndPort.getHost(),
|
||||
hostAndPort.getPort());
|
||||
this.nodes.put(hostAndPort.getHost() + hostAndPort.getPort(), jp);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Jedis getConnection() {
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package redis.clients.jedis.tests;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@@ -43,12 +46,24 @@ public class JedisClusterTest extends Assert {
|
||||
node1.clusterMeet("127.0.0.1", nodeInfo2.getPort());
|
||||
node1.clusterMeet("127.0.0.1", nodeInfo3.getPort());
|
||||
|
||||
// add all slots to node1
|
||||
Pipeline pipelined = node1.pipelined();
|
||||
// split available slots across the three nodes
|
||||
int slotsPerNode = JedisCluster.HASHSLOTS / 3;
|
||||
Pipeline pipeline1 = node1.pipelined();
|
||||
Pipeline pipeline2 = node2.pipelined();
|
||||
Pipeline pipeline3 = node3.pipelined();
|
||||
for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
|
||||
pipelined.clusterAddSlots(i);
|
||||
if (i < slotsPerNode) {
|
||||
pipeline1.clusterAddSlots(i);
|
||||
} else if (i > slotsPerNode * 2) {
|
||||
pipeline3.clusterAddSlots(i);
|
||||
} else {
|
||||
pipeline2.clusterAddSlots(i);
|
||||
}
|
||||
}
|
||||
pipelined.sync();
|
||||
pipeline1.sync();
|
||||
pipeline2.sync();
|
||||
pipeline3.sync();
|
||||
|
||||
|
||||
boolean clusterOk = false;
|
||||
while (!clusterOk) {
|
||||
@@ -72,19 +87,28 @@ public class JedisClusterTest extends Assert {
|
||||
}
|
||||
|
||||
@Test(expected=JedisMovedDataException.class)
|
||||
public void throwMovedExceptionTest() {
|
||||
public void testThrowMovedException() {
|
||||
node1.set("foo", "bar");
|
||||
node2.get("foo");
|
||||
}
|
||||
|
||||
@Test(expected=JedisAskDataException.class)
|
||||
public void throwAskExceptionTest() {
|
||||
public void testThrowAskException() {
|
||||
int keySlot = JedisClusterCRC16.getSlot("test");
|
||||
String node2Id = getNodeId(node2.clusterNodes());
|
||||
node1.clusterSetSlotMigrating(keySlot, node2Id);
|
||||
node1.get("test");
|
||||
String node3Id = getNodeId(node3.clusterNodes());
|
||||
node2.clusterSetSlotMigrating(keySlot, node3Id);
|
||||
node2.get("test");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiscoverNodesAutomatically() {
|
||||
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
|
||||
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
|
||||
JedisCluster jc = new JedisCluster(jedisClusterNode);
|
||||
assertEquals(jc.getClusterNodes().size(), 3);
|
||||
}
|
||||
|
||||
|
||||
private String getNodeId(String infoOutput) {
|
||||
for (String infoLine : infoOutput.split("\n")) {
|
||||
if (infoLine.contains("myself")) {
|
||||
|
||||
Reference in New Issue
Block a user