diff --git a/Makefile b/Makefile
index 1ac2bb9..467541e 100644
--- a/Makefile
+++ b/Makefile
@@ -153,6 +153,42 @@ cluster-enabled yes
cluster-config-file /tmp/redis_cluster_node3.conf
endef
+define REDIS_CLUSTER_NODE4_CONF
+daemonize yes
+port 7382
+cluster-node-timeout 50
+pidfile /tmp/redis_cluster_node4.pid
+logfile /tmp/redis_cluster_node4.log
+save ""
+appendonly no
+cluster-enabled yes
+cluster-config-file /tmp/redis_cluster_node4.conf
+endef
+
+define REDIS_CLUSTER_NODE5_CONF
+daemonize yes
+port 7383
+cluster-node-timeout 5000
+pidfile /tmp/redis_cluster_node5.pid
+logfile /tmp/redis_cluster_node5.log
+save ""
+appendonly no
+cluster-enabled yes
+cluster-config-file /tmp/redis_cluster_node5.conf
+endef
+
+define REDIS_CLUSTER_NODE6_CONF
+daemonize yes
+port 7384
+cluster-node-timeout 5000
+pidfile /tmp/redis_cluster_node6.pid
+logfile /tmp/redis_cluster_node6.log
+save ""
+appendonly no
+cluster-enabled yes
+cluster-config-file /tmp/redis_cluster_node6.conf
+endef
+
export REDIS1_CONF
export REDIS2_CONF
export REDIS3_CONF
@@ -166,6 +202,9 @@ export REDIS_SENTINEL3
export REDIS_CLUSTER_NODE1_CONF
export REDIS_CLUSTER_NODE2_CONF
export REDIS_CLUSTER_NODE3_CONF
+export REDIS_CLUSTER_NODE4_CONF
+export REDIS_CLUSTER_NODE5_CONF
+export REDIS_CLUSTER_NODE6_CONF
start: cleanup
echo "$$REDIS1_CONF" | redis-server -
@@ -183,6 +222,9 @@ start: cleanup
echo "$$REDIS_CLUSTER_NODE1_CONF" | redis-server -
echo "$$REDIS_CLUSTER_NODE2_CONF" | redis-server -
echo "$$REDIS_CLUSTER_NODE3_CONF" | redis-server -
+ echo "$$REDIS_CLUSTER_NODE4_CONF" | redis-server -
+ echo "$$REDIS_CLUSTER_NODE5_CONF" | redis-server -
+ echo "$$REDIS_CLUSTER_NODE6_CONF" | redis-server -
cleanup:
- rm -vf /tmp/redis_cluster_node*.conf 2>/dev/null
@@ -202,12 +244,18 @@ stop:
kill `cat /tmp/redis_cluster_node1.pid` || true
kill `cat /tmp/redis_cluster_node2.pid` || true
kill `cat /tmp/redis_cluster_node3.pid` || true
+ kill `cat /tmp/redis_cluster_node4.pid` || true
+ kill `cat /tmp/redis_cluster_node5.pid` || true
+ kill `cat /tmp/redis_cluster_node6.pid` || true
rm -f /tmp/sentinel1.conf
rm -f /tmp/sentinel2.conf
rm -f /tmp/sentinel3.conf
rm -f /tmp/redis_cluster_node1.conf
rm -f /tmp/redis_cluster_node2.conf
rm -f /tmp/redis_cluster_node3.conf
+ rm -f /tmp/redis_cluster_node4.conf
+ rm -f /tmp/redis_cluster_node5.conf
+ rm -f /tmp/redis_cluster_node6.conf
test:
make start
diff --git a/pom.xml b/pom.xml
index 1be71dd..ddc40c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
localhost:6379,localhost:6380,localhost:6381,localhost:6382,localhost:6383,localhost:6384,localhost:6385
localhost:26379,localhost:26380,localhost:26381
- localhost:7379,localhost:7380,localhost:7381
+ localhost:7379,localhost:7380,localhost:7381,localhost:7382,localhost:7383,localhost:7384,localhost:7385
github
diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java
index ddc8646..69054ef 100644
--- a/src/main/java/redis/clients/jedis/Client.java
+++ b/src/main/java/redis/clients/jedis/Client.java
@@ -1,6 +1,6 @@
package redis.clients.jedis;
-import redis.clients.util.SafeEncoder;
+import static redis.clients.jedis.Protocol.toByteArray;
import java.util.ArrayList;
import java.util.HashMap;
@@ -8,7 +8,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import static redis.clients.jedis.Protocol.toByteArray;
+import redis.clients.util.SafeEncoder;
public class Client extends BinaryClient implements Commands {
public Client(final String host) {
@@ -173,10 +173,6 @@ public class Client extends BinaryClient implements Commands {
hincrBy(SafeEncoder.encode(key), SafeEncoder.encode(field), value);
}
- public void hincrByFloat(final String key, final String field, final double value) {
- hincrByFloat(SafeEncoder.encode(key), SafeEncoder.encode(field), value);
- }
-
public void hexists(final String key, final String field) {
hexists(SafeEncoder.encode(key), SafeEncoder.encode(field));
}
@@ -632,11 +628,10 @@ public class Client extends BinaryClient implements Commands {
public void getbit(String key, long offset) {
getbit(SafeEncoder.encode(key), offset);
}
-
+
public void bitpos(final String key, final boolean value, final BitPosParams params) {
bitpos(SafeEncoder.encode(key), value, params);
}
-
public void setrange(String key, long offset, String value) {
setrange(SafeEncoder.encode(key), offset, SafeEncoder.encode(value));
}
@@ -789,7 +784,7 @@ public class Client extends BinaryClient implements Commands {
public void pexpire(final String key, final int milliseconds) {
pexpire(key, (long) milliseconds);
}
-
+
public void pexpire(final String key, final long milliseconds) {
pexpire(SafeEncoder.encode(key), milliseconds);
}
@@ -840,6 +835,12 @@ public class Client extends BinaryClient implements Commands {
destinationDb, timeout);
}
+ public void hincrByFloat(final String key, final String field,
+ double increment) {
+ hincrByFloat(SafeEncoder.encode(key), SafeEncoder.encode(field),
+ increment);
+ }
+
@Deprecated
/**
* This method is deprecated due to bug (scan cursor should be unsigned long)
@@ -974,4 +975,40 @@ public class Client extends BinaryClient implements Commands {
public void pfmerge(final String destkey, final String... sourcekeys) {
pfmerge(SafeEncoder.encode(destkey), SafeEncoder.encodeMany(sourcekeys));
}
+public void clusterSetSlotStable(final int slot) {
+ cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot),
+ Protocol.CLUSTER_SETSLOT_STABLE);
+ }
+
+ public void clusterForget(final String nodeId) {
+ cluster(Protocol.CLUSTER_FORGET, nodeId);
+ }
+
+ public void clusterFlushSlots() {
+ cluster(Protocol.CLUSTER_FLUSHSLOT);
+ }
+
+ public void clusterKeySlot(final String key) {
+ cluster(Protocol.CLUSTER_KEYSLOT, key);
+ }
+
+ public void clusterCountKeysInSlot(final int slot) {
+ cluster(Protocol.CLUSTER_COUNTKEYINSLOT, String.valueOf(slot));
+ }
+
+ public void clusterSaveConfig() {
+ cluster(Protocol.CLUSTER_SAVECONFIG);
+ }
+
+ public void clusterReplicate(final String nodeId) {
+ cluster(Protocol.CLUSTER_REPLICATE, nodeId);
+ }
+
+ public void clusterSlaves(final String nodeId) {
+ cluster(Protocol.CLUSTER_SLAVES, nodeId);
+ }
+
+ public void clusterFailover() {
+ cluster(Protocol.CLUSTER_FAILOVER);
+ }
}
diff --git a/src/main/java/redis/clients/jedis/ClusterCommands.java b/src/main/java/redis/clients/jedis/ClusterCommands.java
index fff4533..b77069b 100644
--- a/src/main/java/redis/clients/jedis/ClusterCommands.java
+++ b/src/main/java/redis/clients/jedis/ClusterCommands.java
@@ -20,4 +20,22 @@ public interface ClusterCommands {
String clusterSetSlotMigrating(final int slot, final String nodeId);
String clusterSetSlotImporting(final int slot, final String nodeId);
+
+ String clusterSetSlotStable(final int slot);
+
+ String clusterForget(final String nodeId);
+
+ String clusterFlushSlots();
+
+ Long clusterKeySlot(final String key);
+
+ Long clusterCountKeysInSlot(final int slot);
+
+ String clusterSaveConfig();
+
+ String clusterReplicate(final String nodeId);
+
+ List clusterSlaves(final String nodeId);
+
+ String clusterFailover();
}
diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java
index 48968df..8e8a258 100644
--- a/src/main/java/redis/clients/jedis/Jedis.java
+++ b/src/main/java/redis/clients/jedis/Jedis.java
@@ -3136,6 +3136,7 @@ public class Jedis extends BinaryJedis implements JedisCommands,
return client.getIntegerReply();
}
+
public String psetex(final String key, final int milliseconds,
final String value) {
checkIsInMulti();
@@ -3430,6 +3431,60 @@ public class Jedis extends BinaryJedis implements JedisCommands,
client.clusterSetSlotImporting(slot, nodeId);
return client.getStatusCodeReply();
}
+
+ public String clusterSetSlotStable(final int slot) {
+ checkIsInMulti();
+ client.clusterSetSlotStable(slot);
+ return client.getStatusCodeReply();
+ }
+
+ public String clusterForget(final String nodeId) {
+ checkIsInMulti();
+ client.clusterForget(nodeId);
+ return client.getStatusCodeReply();
+ }
+
+ public String clusterFlushSlots() {
+ checkIsInMulti();
+ client.clusterFlushSlots();
+ return client.getStatusCodeReply();
+ }
+
+ public Long clusterKeySlot(final String key) {
+ checkIsInMulti();
+ client.clusterKeySlot(key);
+ return client.getIntegerReply();
+ }
+
+ public Long clusterCountKeysInSlot(final int slot) {
+ checkIsInMulti();
+ client.clusterCountKeysInSlot(slot);
+ return client.getIntegerReply();
+ }
+
+ public String clusterSaveConfig() {
+ checkIsInMulti();
+ client.clusterSaveConfig();
+ return client.getStatusCodeReply();
+ }
+
+ public String clusterReplicate(final String nodeId) {
+ checkIsInMulti();
+ client.clusterReplicate(nodeId);
+ return client.getStatusCodeReply();
+ }
+
+ public List clusterSlaves(final String nodeId) {
+ checkIsInMulti();
+ client.clusterSlaves(nodeId);
+ return client.getMultiBulkReply();
+ }
+
+ public String clusterFailover() {
+ checkIsInMulti();
+ client.clusterFailover();
+ return client.getStatusCodeReply();
+ }
public String asking() {
checkIsInMulti();
diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java
index 485c10b..3f77b9a 100644
--- a/src/main/java/redis/clients/jedis/Protocol.java
+++ b/src/main/java/redis/clients/jedis/Protocol.java
@@ -50,6 +50,15 @@ public final class Protocol {
public static final String CLUSTER_SETSLOT_NODE = "node";
public static final String CLUSTER_SETSLOT_MIGRATING = "migrating";
public static final String CLUSTER_SETSLOT_IMPORTING = "importing";
+ public static final String CLUSTER_SETSLOT_STABLE = "stable";
+ public static final String CLUSTER_FORGET = "forget";
+ public static final String CLUSTER_FLUSHSLOT = "flushslots";
+ public static final String CLUSTER_KEYSLOT = "keyslot";
+ public static final String CLUSTER_COUNTKEYINSLOT = "countkeysinslot";
+ public static final String CLUSTER_SAVECONFIG = "saveconfig";
+ public static final String CLUSTER_REPLICATE = "replicate";
+ public static final String CLUSTER_SLAVES = "slaves";
+ public static final String CLUSTER_FAILOVER = "failover";
public static final String PUBSUB_CHANNELS= "channels";
public static final String PUBSUB_NUMSUB = "numsub";
public static final String PUBSUB_NUM_PAT = "numpat";
diff --git a/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java b/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java
index cb7a58b..b2c9cf0 100644
--- a/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java
+++ b/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java
@@ -27,6 +27,10 @@ public class HostAndPortUtil {
clusterHostAndPortList.add(new HostAndPort("localhost", 7379));
clusterHostAndPortList.add(new HostAndPort("localhost", 7380));
clusterHostAndPortList.add(new HostAndPort("localhost", 7381));
+ clusterHostAndPortList.add(new HostAndPort("localhost", 7382));
+ clusterHostAndPortList.add(new HostAndPort("localhost", 7383));
+ clusterHostAndPortList.add(new HostAndPort("localhost", 7384));
+ clusterHostAndPortList.add(new HostAndPort("localhost", 7385));
String envRedisHosts = System.getProperty("redis-hosts");
String envSentinelHosts = System.getProperty("sentinel-hosts");
diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterReplicateTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterReplicateTest.java
new file mode 100644
index 0000000..39d9ff1
--- /dev/null
+++ b/src/test/java/redis/clients/jedis/tests/JedisClusterReplicateTest.java
@@ -0,0 +1,167 @@
+package redis.clients.jedis.tests;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisDataException;
+import redis.clients.jedis.exceptions.JedisException;
+import redis.clients.jedis.tests.utils.JedisClusterTestUtil;
+
+public class JedisClusterReplicateTest {
+ private static Jedis node5;
+ private static Jedis node6;
+
+ private HostAndPort nodeInfo5 = HostAndPortUtil.getClusterServers().get(4);
+ private HostAndPort nodeInfo6 = HostAndPortUtil.getClusterServers().get(5);
+
+ private static int TIMEOUT = 15000; // cluster-node-timeout * 3
+
+ @Before
+ public void setUp() throws InterruptedException {
+ node5 = new Jedis(nodeInfo5.getHost(), nodeInfo5.getPort(), TIMEOUT);
+ node5.connect();
+ node5.flushAll();
+
+ node6 = new Jedis(nodeInfo6.getHost(), nodeInfo6.getPort(), TIMEOUT);
+ node6.connect();
+ // cannot flushall - it will be slave
+
+ // ---- configure cluster
+
+ // add nodes to cluster
+ node5.clusterMeet("127.0.0.1", nodeInfo6.getPort());
+
+ JedisClusterTestUtil.assertNodeIsKnown(node5, JedisClusterTestUtil.getNodeId(node6.clusterNodes()), 1000);
+ JedisClusterTestUtil.assertNodeIsKnown(node6, JedisClusterTestUtil.getNodeId(node5.clusterNodes()), 1000);
+
+ // split available slots across the three nodes
+ int[] node5Slots = new int[JedisCluster.HASHSLOTS];
+ for (int i = 0 ; i < JedisCluster.HASHSLOTS; i++) {
+ node5Slots[i] = i;
+ }
+
+ node5.clusterAddSlots(node5Slots);
+
+ JedisClusterTestUtil.waitForClusterReady(node5);
+
+ // replicate full 1on1
+ node6.clusterReplicate(JedisClusterTestUtil.getNodeId(node5
+ .clusterNodes()));
+
+ Map replMap = new HashMap();
+ replMap.put(node5, node6);
+
+ waitForReplicateReady(replMap, TIMEOUT);
+ JedisClusterTestUtil.waitForClusterReady(node5, node6);
+ }
+
+ private void waitForReplicateReady(Map replMap, int timeoutMs) {
+ int interval = 100;
+
+ for (int timeout = 0; timeout <= timeoutMs; timeout += interval) {
+ for (Entry entry : replMap.entrySet()) {
+ Jedis master = entry.getKey();
+ Jedis slave = entry.getValue();
+
+ String masterNodeId = JedisClusterTestUtil.getNodeId(master
+ .clusterNodes());
+ String slaveNodeId = JedisClusterTestUtil.getNodeId(slave
+ .clusterNodes());
+
+ try {
+ List slaves = master.clusterSlaves(masterNodeId);
+
+ if (slaves.size() > 0 && slaves.get(0).contains(slaveNodeId)) {
+ return;
+ }
+ } catch (JedisDataException e) {
+ if (!e.getMessage().startsWith("ERR The specified node is not a master"))
+ throw e;
+
+ // retry...
+ }
+ }
+
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ throw new JedisException("there seems to replication error");
+ }
+
+ @After
+ public void tearDown() throws InterruptedException {
+ // clear all slots
+ int[] slotsToDelete = new int[JedisCluster.HASHSLOTS];
+ for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
+ slotsToDelete[i] = i;
+ }
+
+ node5.clusterDelSlots(slotsToDelete);
+ }
+
+ @Test
+ public void testClusterReplicate() {
+ // we're already replicate 1on1
+ List slaveInfos = node5.clusterSlaves(JedisClusterTestUtil
+ .getNodeId(node5.clusterNodes()));
+ assertEquals(1, slaveInfos.size());
+ assertTrue(slaveInfos.get(0).contains(
+ JedisClusterTestUtil.getNodeId(node6.clusterNodes())));
+ }
+
+ @Test
+ public void testClusterFailover() throws InterruptedException {
+ Set jedisClusterNode = new HashSet();
+ jedisClusterNode.add(new HostAndPort(nodeInfo5.getHost(), nodeInfo5.getPort()));
+ JedisCluster jc = new JedisCluster(jedisClusterNode);
+
+ jc.set("51", "foo");
+ // node5 is responsible of taking care of slot for key "51" (7186)
+
+ node6.clusterFailover();
+
+ try {
+ // wait for failover
+ Map replMap = new HashMap();
+ replMap.put(node6, node5);
+ waitForReplicateReady(replMap, TIMEOUT);
+ JedisClusterTestUtil.waitForClusterReady(node5, node6);
+
+ List slaveInfos = node6.clusterSlaves(JedisClusterTestUtil
+ .getNodeId(node6.clusterNodes()));
+ assertEquals(1, slaveInfos.size());
+ assertTrue(slaveInfos.get(0).contains(
+ JedisClusterTestUtil.getNodeId(node5.clusterNodes())));
+ } finally {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+
+ // rollback
+ node5.clusterFailover();
+
+ Map replMap = new HashMap();
+ replMap.put(node5, node6);
+ waitForReplicateReady(replMap, TIMEOUT);
+ JedisClusterTestUtil.waitForClusterReady(node5, node6);
+ }
+ }
+}
diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java
index c138e8d..28af2a4 100644
--- a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java
+++ b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java
@@ -1,6 +1,8 @@
package redis.clients.jedis.tests;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import org.junit.After;
@@ -15,17 +17,21 @@ import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException;
+import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
+import redis.clients.jedis.tests.utils.JedisClusterTestUtil;
import redis.clients.util.JedisClusterCRC16;
public class JedisClusterTest extends Assert {
- private Jedis node1;
+ private static Jedis node1;
private static Jedis node2;
private static Jedis node3;
+ private static Jedis node4;
private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0);
private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1);
private HostAndPort nodeInfo3 = HostAndPortUtil.getClusterServers().get(2);
+ private HostAndPort nodeInfo4 = HostAndPortUtil.getClusterServers().get(3);
@Before
public void setUp() throws InterruptedException {
@@ -40,6 +46,10 @@ public class JedisClusterTest extends Assert {
node3 = new Jedis(nodeInfo3.getHost(), nodeInfo3.getPort());
node3.connect();
node3.flushAll();
+
+ node4 = new Jedis(nodeInfo4.getHost(), nodeInfo4.getPort());
+ node4.connect();
+ node4.flushAll();
// ---- configure cluster
@@ -66,29 +76,53 @@ public class JedisClusterTest extends Assert {
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
- waitForClusterReady();
+ JedisClusterTestUtil.waitForClusterReady(node1, node2, node3);
}
@AfterClass
public static void cleanUp() {
int slotTest = JedisClusterCRC16.getSlot("test");
int slot51 = JedisClusterCRC16.getSlot("51");
- String node3Id = getNodeId(node3.clusterNodes());
+
+ String node1Id = JedisClusterTestUtil.getNodeId(node1.clusterNodes());
+ String node2Id = JedisClusterTestUtil.getNodeId(node2.clusterNodes());
+ String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
node2.clusterSetSlotNode(slotTest, node3Id);
node2.clusterSetSlotNode(slot51, node3Id);
node2.clusterDelSlots(slotTest, slot51);
+
+ // forget about all nodes
+ node1.clusterForget(node2Id);
+ node1.clusterForget(node3Id);
+ node2.clusterForget(node1Id);
+ node2.clusterForget(node3Id);
+ node3.clusterForget(node1Id);
+ node3.clusterForget(node2Id);
}
@After
- public void tearDown() {
+ public void tearDown() throws InterruptedException {
// clear all slots
int[] slotsToDelete = new int[JedisCluster.HASHSLOTS];
for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
slotsToDelete[i] = i;
}
+
node1.clusterDelSlots(slotsToDelete);
node2.clusterDelSlots(slotsToDelete);
node3.clusterDelSlots(slotsToDelete);
+
+ clearAnyInconsistentMigration(node1);
+ clearAnyInconsistentMigration(node2);
+ clearAnyInconsistentMigration(node3);
+ }
+
+ private void clearAnyInconsistentMigration(Jedis node) {
+ // FIXME: it's too slow... apply pipeline if possible
+ List slots = getInconsistentSlots(node.clusterNodes());
+ for (Integer slot : slots) {
+ node.clusterSetSlotStable(slot);
+ }
}
@Test(expected = JedisMovedDataException.class)
@@ -112,7 +146,7 @@ public class JedisClusterTest extends Assert {
@Test(expected = JedisAskDataException.class)
public void testThrowAskException() {
int keySlot = JedisClusterCRC16.getSlot("test");
- String node3Id = getNodeId(node3.clusterNodes());
+ String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
node2.clusterSetSlotMigrating(keySlot, node3Id);
node2.get("test");
}
@@ -122,7 +156,7 @@ public class JedisClusterTest extends Assert {
Set jedisClusterNode = new HashSet();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
- assertEquals(jc.getClusterNodes().size(), 3);
+ assertEquals(3, jc.getClusterNodes().size());
}
@Test
@@ -146,7 +180,7 @@ public class JedisClusterTest extends Assert {
node3.clusterDelSlots(slot51);
node3.clusterAddSlots(slot51);
- waitForClusterReady();
+ JedisClusterTestUtil.waitForClusterReady(node1, node2, node3);
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
@@ -157,8 +191,8 @@ public class JedisClusterTest extends Assert {
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
int slot51 = JedisClusterCRC16.getSlot("51");
- node3.clusterSetSlotImporting(slot51, getNodeId(node2.clusterNodes()));
- node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes()));
+ node3.clusterSetSlotImporting(slot51, JedisClusterTestUtil.getNodeId(node2.clusterNodes()));
+ node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
@@ -178,7 +212,7 @@ public class JedisClusterTest extends Assert {
JedisCluster jc = new JedisCluster(jedisClusterNode);
int slot51 = JedisClusterCRC16.getSlot("51");
// This will cause an infinite redirection loop
- node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes()));
+ node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
}
@@ -190,25 +224,181 @@ public class JedisClusterTest extends Assert {
assertEquals(JedisClusterCRC16.getSlot("foo{bar}{zap}"), JedisClusterCRC16.getSlot("bar"));
}
- private static String getNodeId(String infoOutput) {
- for (String infoLine : infoOutput.split("\n")) {
- if (infoLine.contains("myself")) {
- return infoLine.split(" ")[0];
- }
- }
- return "";
+ @Test
+ public void testClusterForgetNode() throws InterruptedException {
+ // at first, join node4 to cluster
+ node1.clusterMeet("127.0.0.1", nodeInfo4.getPort());
+
+ String node7Id = JedisClusterTestUtil.getNodeId(node4.clusterNodes());
+
+ JedisClusterTestUtil.assertNodeIsKnown(node3, node7Id, 1000);
+ JedisClusterTestUtil.assertNodeIsKnown(node2, node7Id, 1000);
+ JedisClusterTestUtil.assertNodeIsKnown(node1, node7Id, 1000);
+
+ assertNodeHandshakeEnded(node3, 1000);
+ assertNodeHandshakeEnded(node2, 1000);
+ assertNodeHandshakeEnded(node1, 1000);
+
+ assertEquals(4, node1.clusterNodes().split("\n").length);
+ assertEquals(4, node2.clusterNodes().split("\n").length);
+ assertEquals(4, node3.clusterNodes().split("\n").length);
+
+ // do cluster forget
+ node1.clusterForget(node7Id);
+ node2.clusterForget(node7Id);
+ node3.clusterForget(node7Id);
+
+ JedisClusterTestUtil.assertNodeIsUnknown(node1, node7Id, 1000);
+ JedisClusterTestUtil.assertNodeIsUnknown(node2, node7Id, 1000);
+ JedisClusterTestUtil.assertNodeIsUnknown(node3, node7Id, 1000);
+
+ assertEquals(3, node1.clusterNodes().split("\n").length);
+ assertEquals(3, node2.clusterNodes().split("\n").length);
+ assertEquals(3, node3.clusterNodes().split("\n").length);
}
-
- private void waitForClusterReady() throws InterruptedException {
- boolean clusterOk = false;
- while (!clusterOk) {
- if (node1.clusterInfo().split("\n")[0].contains("ok")
- && node2.clusterInfo().split("\n")[0].contains("ok")
- && node3.clusterInfo().split("\n")[0].contains("ok")) {
- clusterOk = true;
+
+ @Test
+ public void testClusterFlushSlots() {
+ String slotRange = getNodeServingSlotRange(node1.clusterNodes());
+ assertNotNull(slotRange);
+
+ try {
+ node1.clusterFlushSlots();
+ assertNull(getNodeServingSlotRange(node1.clusterNodes()));
+ } finally {
+ // rollback
+ String[] rangeInfo = slotRange.split("-");
+ int lower = Integer.parseInt(rangeInfo[0]);
+ int upper = Integer.parseInt(rangeInfo[1]);
+
+ int[] node1Slots = new int[upper - lower + 1];
+ for (int i = 0 ; lower <= upper ; ) {
+ node1Slots[i++] = lower++;
}
- Thread.sleep(50);
+ node1.clusterAddSlots(node1Slots);
}
}
+ @Test
+ public void testClusterKeySlot() {
+ // It assumes JedisClusterCRC16 is correctly implemented
+ assertEquals(node1.clusterKeySlot("foo{bar}zap}").intValue(), JedisClusterCRC16.getSlot("foo{bar}zap"));
+ assertEquals(node1.clusterKeySlot("{user1000}.following").intValue(), JedisClusterCRC16.getSlot("{user1000}.following"));
+ }
+
+ @Test
+ public void testClusterCountKeysInSlot() {
+ Set jedisClusterNode = new HashSet();
+ jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort()));
+ JedisCluster jc = new JedisCluster(jedisClusterNode);
+
+ for (int index = 0 ; index < 5 ; index++) {
+ jc.set("foo{bar}" + index, "hello");
+ }
+
+ int slot = JedisClusterCRC16.getSlot("foo{bar}");
+ assertEquals(5, node1.clusterCountKeysInSlot(slot).intValue());
+ }
+
+ @Test
+ public void testStableSlotWhenMigratingNodeOrImportingNodeIsNotSpecified() throws InterruptedException {
+ Set jedisClusterNode = new HashSet();
+ jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort()));
+ JedisCluster jc = new JedisCluster(jedisClusterNode);
+
+ int slot51 = JedisClusterCRC16.getSlot("51");
+ jc.set("51", "foo");
+ // node2 is responsible of taking care of slot51 (7186)
+
+ node3.clusterSetSlotImporting(slot51, JedisClusterTestUtil.getNodeId(node2.clusterNodes()));
+ assertEquals("foo", jc.get("51"));
+ node3.clusterSetSlotStable(slot51);
+ assertEquals("foo", jc.get("51"));
+
+ node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
+ //assertEquals("foo", jc.get("51")); // it leads Max Redirections
+ node2.clusterSetSlotStable(slot51);
+ assertEquals("foo", jc.get("51"));
+ }
+
+ private static String getNodeServingSlotRange(String infoOutput) {
+ // f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0 1394372400827 0 connected 5461-10922
+ for (String infoLine : infoOutput.split("\n")) {
+ if (infoLine.contains("myself")) {
+ try {
+ return infoLine.split(" ")[8];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
+ private List getInconsistentSlots(String infoOuput) {
+ for (String infoLine : infoOuput.split("\n")) {
+ if (infoLine.contains("myself")) {
+ return getSlotsBeingMigrated(infoLine);
+ }
+ }
+
+ return null;
+ }
+
+ private List getSlotsBeingMigrated(String infoLine) {
+ List inconsistentSlots = new ArrayList();
+
+ String[] splitted = infoLine.split(" ");
+
+ if (splitted.length > 8) {
+ for (int index = 8 ; index < splitted.length ; index++) {
+ String info = splitted[index];
+ Integer slot = getSlotFromMigrationInfo(info);
+ if (slot != null) {
+ inconsistentSlots.add(slot);
+ }
+ }
+ }
+
+ return inconsistentSlots;
+ }
+
+ private Integer getSlotFromMigrationInfo(String info) {
+ if (info.startsWith("[")) {
+ if (info.contains("-<-")) {
+ return Integer.parseInt(info.split("-<-")[0].substring(1));
+ } else if (info.contains("->-")) {
+ return Integer.parseInt(info.split("->-")[0].substring(1));
+ }
+ }
+
+ return null;
+ }
+
+ private void assertNodeHandshakeEnded(Jedis node, int timeoutMs) {
+ int sleepInterval = 100;
+ for (int sleepTime = 0 ; sleepTime <= timeoutMs ; sleepTime += sleepInterval) {
+ boolean isHandshaking = isAnyNodeHandshaking(node);
+ if (!isHandshaking)
+ return;
+
+ try {
+ Thread.sleep(sleepInterval);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ throw new JedisException("Node handshaking is not ended");
+ }
+
+ private boolean isAnyNodeHandshaking(Jedis node) {
+ String infoOutput = node.clusterNodes();
+ for (String infoLine : infoOutput.split("\n")) {
+ if (infoLine.contains("handshake")) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}
diff --git a/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java b/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java
new file mode 100644
index 0000000..c7f7928
--- /dev/null
+++ b/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java
@@ -0,0 +1,70 @@
+package redis.clients.jedis.tests.utils;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.exceptions.JedisException;
+
+public class JedisClusterTestUtil {
+ public static void waitForClusterReady(Jedis...nodes) throws InterruptedException {
+ boolean clusterOk = false;
+ while (!clusterOk) {
+ boolean isOk = true;
+ for (Jedis node : nodes) {
+ if (!node.clusterInfo().split("\n")[0].contains("ok")) {
+ isOk = false;
+ break;
+ }
+ }
+
+ if (isOk) {
+ clusterOk = true;
+ }
+
+ Thread.sleep(50);
+ }
+ }
+
+ public static String getNodeId(String infoOutput) {
+ for (String infoLine : infoOutput.split("\n")) {
+ if (infoLine.contains("myself")) {
+ return infoLine.split(" ")[0];
+ }
+ }
+ return "";
+ }
+
+ public static void assertNodeIsKnown(Jedis node, String targetNodeId, int timeoutMs) {
+ assertNodeRecognizedStatus(node, targetNodeId, true, timeoutMs);
+ }
+
+ public static void assertNodeIsUnknown(Jedis node, String targetNodeId, int timeoutMs) {
+ assertNodeRecognizedStatus(node, targetNodeId, false, timeoutMs);
+ }
+
+ private static void assertNodeRecognizedStatus(Jedis node, String targetNodeId, boolean shouldRecognized, int timeoutMs) {
+ int sleepInterval = 100;
+ for (int sleepTime = 0 ; sleepTime <= timeoutMs ; sleepTime += sleepInterval) {
+ boolean known = isKnownNode(node, targetNodeId);
+ if (shouldRecognized == known)
+ return;
+
+ try {
+ Thread.sleep(sleepInterval);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ throw new JedisException("Node recognize check error");
+ }
+
+ private static boolean isKnownNode(Jedis node, String nodeId) {
+ String infoOutput = node.clusterNodes();
+ for (String infoLine : infoOutput.split("\n")) {
+ if (infoLine.contains(nodeId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+}