diff --git a/Makefile b/Makefile index 1ac2bb9..467541e 100644 --- a/Makefile +++ b/Makefile @@ -153,6 +153,42 @@ cluster-enabled yes cluster-config-file /tmp/redis_cluster_node3.conf endef +define REDIS_CLUSTER_NODE4_CONF +daemonize yes +port 7382 +cluster-node-timeout 50 +pidfile /tmp/redis_cluster_node4.pid +logfile /tmp/redis_cluster_node4.log +save "" +appendonly no +cluster-enabled yes +cluster-config-file /tmp/redis_cluster_node4.conf +endef + +define REDIS_CLUSTER_NODE5_CONF +daemonize yes +port 7383 +cluster-node-timeout 5000 +pidfile /tmp/redis_cluster_node5.pid +logfile /tmp/redis_cluster_node5.log +save "" +appendonly no +cluster-enabled yes +cluster-config-file /tmp/redis_cluster_node5.conf +endef + +define REDIS_CLUSTER_NODE6_CONF +daemonize yes +port 7384 +cluster-node-timeout 5000 +pidfile /tmp/redis_cluster_node6.pid +logfile /tmp/redis_cluster_node6.log +save "" +appendonly no +cluster-enabled yes +cluster-config-file /tmp/redis_cluster_node6.conf +endef + export REDIS1_CONF export REDIS2_CONF export REDIS3_CONF @@ -166,6 +202,9 @@ export REDIS_SENTINEL3 export REDIS_CLUSTER_NODE1_CONF export REDIS_CLUSTER_NODE2_CONF export REDIS_CLUSTER_NODE3_CONF +export REDIS_CLUSTER_NODE4_CONF +export REDIS_CLUSTER_NODE5_CONF +export REDIS_CLUSTER_NODE6_CONF start: cleanup echo "$$REDIS1_CONF" | redis-server - @@ -183,6 +222,9 @@ start: cleanup echo "$$REDIS_CLUSTER_NODE1_CONF" | redis-server - echo "$$REDIS_CLUSTER_NODE2_CONF" | redis-server - echo "$$REDIS_CLUSTER_NODE3_CONF" | redis-server - + echo "$$REDIS_CLUSTER_NODE4_CONF" | redis-server - + echo "$$REDIS_CLUSTER_NODE5_CONF" | redis-server - + echo "$$REDIS_CLUSTER_NODE6_CONF" | redis-server - cleanup: - rm -vf /tmp/redis_cluster_node*.conf 2>/dev/null @@ -202,12 +244,18 @@ stop: kill `cat /tmp/redis_cluster_node1.pid` || true kill `cat /tmp/redis_cluster_node2.pid` || true kill `cat /tmp/redis_cluster_node3.pid` || true + kill `cat /tmp/redis_cluster_node4.pid` || true + kill `cat /tmp/redis_cluster_node5.pid` || true + kill `cat /tmp/redis_cluster_node6.pid` || true rm -f /tmp/sentinel1.conf rm -f /tmp/sentinel2.conf rm -f /tmp/sentinel3.conf rm -f /tmp/redis_cluster_node1.conf rm -f /tmp/redis_cluster_node2.conf rm -f /tmp/redis_cluster_node3.conf + rm -f /tmp/redis_cluster_node4.conf + rm -f /tmp/redis_cluster_node5.conf + rm -f /tmp/redis_cluster_node6.conf test: make start diff --git a/pom.xml b/pom.xml index 1be71dd..ddc40c3 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ localhost:6379,localhost:6380,localhost:6381,localhost:6382,localhost:6383,localhost:6384,localhost:6385 localhost:26379,localhost:26380,localhost:26381 - localhost:7379,localhost:7380,localhost:7381 + localhost:7379,localhost:7380,localhost:7381,localhost:7382,localhost:7383,localhost:7384,localhost:7385 github diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index ddc8646..69054ef 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -1,6 +1,6 @@ package redis.clients.jedis; -import redis.clients.util.SafeEncoder; +import static redis.clients.jedis.Protocol.toByteArray; import java.util.ArrayList; import java.util.HashMap; @@ -8,7 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import static redis.clients.jedis.Protocol.toByteArray; +import redis.clients.util.SafeEncoder; public class Client extends BinaryClient implements Commands { public Client(final String host) { @@ -173,10 +173,6 @@ public class Client extends BinaryClient implements Commands { hincrBy(SafeEncoder.encode(key), SafeEncoder.encode(field), value); } - public void hincrByFloat(final String key, final String field, final double value) { - hincrByFloat(SafeEncoder.encode(key), SafeEncoder.encode(field), value); - } - public void hexists(final String key, final String field) { hexists(SafeEncoder.encode(key), SafeEncoder.encode(field)); } @@ -632,11 +628,10 @@ public class Client extends BinaryClient implements Commands { public void getbit(String key, long offset) { getbit(SafeEncoder.encode(key), offset); } - + public void bitpos(final String key, final boolean value, final BitPosParams params) { bitpos(SafeEncoder.encode(key), value, params); } - public void setrange(String key, long offset, String value) { setrange(SafeEncoder.encode(key), offset, SafeEncoder.encode(value)); } @@ -789,7 +784,7 @@ public class Client extends BinaryClient implements Commands { public void pexpire(final String key, final int milliseconds) { pexpire(key, (long) milliseconds); } - + public void pexpire(final String key, final long milliseconds) { pexpire(SafeEncoder.encode(key), milliseconds); } @@ -840,6 +835,12 @@ public class Client extends BinaryClient implements Commands { destinationDb, timeout); } + public void hincrByFloat(final String key, final String field, + double increment) { + hincrByFloat(SafeEncoder.encode(key), SafeEncoder.encode(field), + increment); + } + @Deprecated /** * This method is deprecated due to bug (scan cursor should be unsigned long) @@ -974,4 +975,40 @@ public class Client extends BinaryClient implements Commands { public void pfmerge(final String destkey, final String... sourcekeys) { pfmerge(SafeEncoder.encode(destkey), SafeEncoder.encodeMany(sourcekeys)); } +public void clusterSetSlotStable(final int slot) { + cluster(Protocol.CLUSTER_SETSLOT, String.valueOf(slot), + Protocol.CLUSTER_SETSLOT_STABLE); + } + + public void clusterForget(final String nodeId) { + cluster(Protocol.CLUSTER_FORGET, nodeId); + } + + public void clusterFlushSlots() { + cluster(Protocol.CLUSTER_FLUSHSLOT); + } + + public void clusterKeySlot(final String key) { + cluster(Protocol.CLUSTER_KEYSLOT, key); + } + + public void clusterCountKeysInSlot(final int slot) { + cluster(Protocol.CLUSTER_COUNTKEYINSLOT, String.valueOf(slot)); + } + + public void clusterSaveConfig() { + cluster(Protocol.CLUSTER_SAVECONFIG); + } + + public void clusterReplicate(final String nodeId) { + cluster(Protocol.CLUSTER_REPLICATE, nodeId); + } + + public void clusterSlaves(final String nodeId) { + cluster(Protocol.CLUSTER_SLAVES, nodeId); + } + + public void clusterFailover() { + cluster(Protocol.CLUSTER_FAILOVER); + } } diff --git a/src/main/java/redis/clients/jedis/ClusterCommands.java b/src/main/java/redis/clients/jedis/ClusterCommands.java index fff4533..b77069b 100644 --- a/src/main/java/redis/clients/jedis/ClusterCommands.java +++ b/src/main/java/redis/clients/jedis/ClusterCommands.java @@ -20,4 +20,22 @@ public interface ClusterCommands { String clusterSetSlotMigrating(final int slot, final String nodeId); String clusterSetSlotImporting(final int slot, final String nodeId); + + String clusterSetSlotStable(final int slot); + + String clusterForget(final String nodeId); + + String clusterFlushSlots(); + + Long clusterKeySlot(final String key); + + Long clusterCountKeysInSlot(final int slot); + + String clusterSaveConfig(); + + String clusterReplicate(final String nodeId); + + List clusterSlaves(final String nodeId); + + String clusterFailover(); } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 48968df..8e8a258 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3136,6 +3136,7 @@ public class Jedis extends BinaryJedis implements JedisCommands, return client.getIntegerReply(); } + public String psetex(final String key, final int milliseconds, final String value) { checkIsInMulti(); @@ -3430,6 +3431,60 @@ public class Jedis extends BinaryJedis implements JedisCommands, client.clusterSetSlotImporting(slot, nodeId); return client.getStatusCodeReply(); } + + public String clusterSetSlotStable(final int slot) { + checkIsInMulti(); + client.clusterSetSlotStable(slot); + return client.getStatusCodeReply(); + } + + public String clusterForget(final String nodeId) { + checkIsInMulti(); + client.clusterForget(nodeId); + return client.getStatusCodeReply(); + } + + public String clusterFlushSlots() { + checkIsInMulti(); + client.clusterFlushSlots(); + return client.getStatusCodeReply(); + } + + public Long clusterKeySlot(final String key) { + checkIsInMulti(); + client.clusterKeySlot(key); + return client.getIntegerReply(); + } + + public Long clusterCountKeysInSlot(final int slot) { + checkIsInMulti(); + client.clusterCountKeysInSlot(slot); + return client.getIntegerReply(); + } + + public String clusterSaveConfig() { + checkIsInMulti(); + client.clusterSaveConfig(); + return client.getStatusCodeReply(); + } + + public String clusterReplicate(final String nodeId) { + checkIsInMulti(); + client.clusterReplicate(nodeId); + return client.getStatusCodeReply(); + } + + public List clusterSlaves(final String nodeId) { + checkIsInMulti(); + client.clusterSlaves(nodeId); + return client.getMultiBulkReply(); + } + + public String clusterFailover() { + checkIsInMulti(); + client.clusterFailover(); + return client.getStatusCodeReply(); + } public String asking() { checkIsInMulti(); diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 485c10b..3f77b9a 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -50,6 +50,15 @@ public final class Protocol { public static final String CLUSTER_SETSLOT_NODE = "node"; public static final String CLUSTER_SETSLOT_MIGRATING = "migrating"; public static final String CLUSTER_SETSLOT_IMPORTING = "importing"; + public static final String CLUSTER_SETSLOT_STABLE = "stable"; + public static final String CLUSTER_FORGET = "forget"; + public static final String CLUSTER_FLUSHSLOT = "flushslots"; + public static final String CLUSTER_KEYSLOT = "keyslot"; + public static final String CLUSTER_COUNTKEYINSLOT = "countkeysinslot"; + public static final String CLUSTER_SAVECONFIG = "saveconfig"; + public static final String CLUSTER_REPLICATE = "replicate"; + public static final String CLUSTER_SLAVES = "slaves"; + public static final String CLUSTER_FAILOVER = "failover"; public static final String PUBSUB_CHANNELS= "channels"; public static final String PUBSUB_NUMSUB = "numsub"; public static final String PUBSUB_NUM_PAT = "numpat"; diff --git a/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java b/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java index cb7a58b..b2c9cf0 100644 --- a/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java +++ b/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java @@ -27,6 +27,10 @@ public class HostAndPortUtil { clusterHostAndPortList.add(new HostAndPort("localhost", 7379)); clusterHostAndPortList.add(new HostAndPort("localhost", 7380)); clusterHostAndPortList.add(new HostAndPort("localhost", 7381)); + clusterHostAndPortList.add(new HostAndPort("localhost", 7382)); + clusterHostAndPortList.add(new HostAndPort("localhost", 7383)); + clusterHostAndPortList.add(new HostAndPort("localhost", 7384)); + clusterHostAndPortList.add(new HostAndPort("localhost", 7385)); String envRedisHosts = System.getProperty("redis-hosts"); String envSentinelHosts = System.getProperty("sentinel-hosts"); diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterReplicateTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterReplicateTest.java new file mode 100644 index 0000000..39d9ff1 --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterReplicateTest.java @@ -0,0 +1,167 @@ +package redis.clients.jedis.tests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.tests.utils.JedisClusterTestUtil; + +public class JedisClusterReplicateTest { + private static Jedis node5; + private static Jedis node6; + + private HostAndPort nodeInfo5 = HostAndPortUtil.getClusterServers().get(4); + private HostAndPort nodeInfo6 = HostAndPortUtil.getClusterServers().get(5); + + private static int TIMEOUT = 15000; // cluster-node-timeout * 3 + + @Before + public void setUp() throws InterruptedException { + node5 = new Jedis(nodeInfo5.getHost(), nodeInfo5.getPort(), TIMEOUT); + node5.connect(); + node5.flushAll(); + + node6 = new Jedis(nodeInfo6.getHost(), nodeInfo6.getPort(), TIMEOUT); + node6.connect(); + // cannot flushall - it will be slave + + // ---- configure cluster + + // add nodes to cluster + node5.clusterMeet("127.0.0.1", nodeInfo6.getPort()); + + JedisClusterTestUtil.assertNodeIsKnown(node5, JedisClusterTestUtil.getNodeId(node6.clusterNodes()), 1000); + JedisClusterTestUtil.assertNodeIsKnown(node6, JedisClusterTestUtil.getNodeId(node5.clusterNodes()), 1000); + + // split available slots across the three nodes + int[] node5Slots = new int[JedisCluster.HASHSLOTS]; + for (int i = 0 ; i < JedisCluster.HASHSLOTS; i++) { + node5Slots[i] = i; + } + + node5.clusterAddSlots(node5Slots); + + JedisClusterTestUtil.waitForClusterReady(node5); + + // replicate full 1on1 + node6.clusterReplicate(JedisClusterTestUtil.getNodeId(node5 + .clusterNodes())); + + Map replMap = new HashMap(); + replMap.put(node5, node6); + + waitForReplicateReady(replMap, TIMEOUT); + JedisClusterTestUtil.waitForClusterReady(node5, node6); + } + + private void waitForReplicateReady(Map replMap, int timeoutMs) { + int interval = 100; + + for (int timeout = 0; timeout <= timeoutMs; timeout += interval) { + for (Entry entry : replMap.entrySet()) { + Jedis master = entry.getKey(); + Jedis slave = entry.getValue(); + + String masterNodeId = JedisClusterTestUtil.getNodeId(master + .clusterNodes()); + String slaveNodeId = JedisClusterTestUtil.getNodeId(slave + .clusterNodes()); + + try { + List slaves = master.clusterSlaves(masterNodeId); + + if (slaves.size() > 0 && slaves.get(0).contains(slaveNodeId)) { + return; + } + } catch (JedisDataException e) { + if (!e.getMessage().startsWith("ERR The specified node is not a master")) + throw e; + + // retry... + } + } + + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + } + } + + throw new JedisException("there seems to replication error"); + } + + @After + public void tearDown() throws InterruptedException { + // clear all slots + int[] slotsToDelete = new int[JedisCluster.HASHSLOTS]; + for (int i = 0; i < JedisCluster.HASHSLOTS; i++) { + slotsToDelete[i] = i; + } + + node5.clusterDelSlots(slotsToDelete); + } + + @Test + public void testClusterReplicate() { + // we're already replicate 1on1 + List slaveInfos = node5.clusterSlaves(JedisClusterTestUtil + .getNodeId(node5.clusterNodes())); + assertEquals(1, slaveInfos.size()); + assertTrue(slaveInfos.get(0).contains( + JedisClusterTestUtil.getNodeId(node6.clusterNodes()))); + } + + @Test + public void testClusterFailover() throws InterruptedException { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort(nodeInfo5.getHost(), nodeInfo5.getPort())); + JedisCluster jc = new JedisCluster(jedisClusterNode); + + jc.set("51", "foo"); + // node5 is responsible of taking care of slot for key "51" (7186) + + node6.clusterFailover(); + + try { + // wait for failover + Map replMap = new HashMap(); + replMap.put(node6, node5); + waitForReplicateReady(replMap, TIMEOUT); + JedisClusterTestUtil.waitForClusterReady(node5, node6); + + List slaveInfos = node6.clusterSlaves(JedisClusterTestUtil + .getNodeId(node6.clusterNodes())); + assertEquals(1, slaveInfos.size()); + assertTrue(slaveInfos.get(0).contains( + JedisClusterTestUtil.getNodeId(node5.clusterNodes()))); + } finally { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + + // rollback + node5.clusterFailover(); + + Map replMap = new HashMap(); + replMap.put(node5, node6); + waitForReplicateReady(replMap, TIMEOUT); + JedisClusterTestUtil.waitForClusterReady(node5, node6); + } + } +} diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java index c138e8d..28af2a4 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java @@ -1,6 +1,8 @@ package redis.clients.jedis.tests; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.junit.After; @@ -15,17 +17,21 @@ import redis.clients.jedis.JedisCluster; import redis.clients.jedis.exceptions.JedisAskDataException; import redis.clients.jedis.exceptions.JedisClusterException; import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException; +import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.exceptions.JedisMovedDataException; +import redis.clients.jedis.tests.utils.JedisClusterTestUtil; import redis.clients.util.JedisClusterCRC16; public class JedisClusterTest extends Assert { - private Jedis node1; + private static Jedis node1; private static Jedis node2; private static Jedis node3; + private static Jedis node4; private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0); private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1); private HostAndPort nodeInfo3 = HostAndPortUtil.getClusterServers().get(2); + private HostAndPort nodeInfo4 = HostAndPortUtil.getClusterServers().get(3); @Before public void setUp() throws InterruptedException { @@ -40,6 +46,10 @@ public class JedisClusterTest extends Assert { node3 = new Jedis(nodeInfo3.getHost(), nodeInfo3.getPort()); node3.connect(); node3.flushAll(); + + node4 = new Jedis(nodeInfo4.getHost(), nodeInfo4.getPort()); + node4.connect(); + node4.flushAll(); // ---- configure cluster @@ -66,29 +76,53 @@ public class JedisClusterTest extends Assert { node2.clusterAddSlots(node2Slots); node3.clusterAddSlots(node3Slots); - waitForClusterReady(); + JedisClusterTestUtil.waitForClusterReady(node1, node2, node3); } @AfterClass public static void cleanUp() { int slotTest = JedisClusterCRC16.getSlot("test"); int slot51 = JedisClusterCRC16.getSlot("51"); - String node3Id = getNodeId(node3.clusterNodes()); + + String node1Id = JedisClusterTestUtil.getNodeId(node1.clusterNodes()); + String node2Id = JedisClusterTestUtil.getNodeId(node2.clusterNodes()); + String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes()); node2.clusterSetSlotNode(slotTest, node3Id); node2.clusterSetSlotNode(slot51, node3Id); node2.clusterDelSlots(slotTest, slot51); + + // forget about all nodes + node1.clusterForget(node2Id); + node1.clusterForget(node3Id); + node2.clusterForget(node1Id); + node2.clusterForget(node3Id); + node3.clusterForget(node1Id); + node3.clusterForget(node2Id); } @After - public void tearDown() { + public void tearDown() throws InterruptedException { // clear all slots int[] slotsToDelete = new int[JedisCluster.HASHSLOTS]; for (int i = 0; i < JedisCluster.HASHSLOTS; i++) { slotsToDelete[i] = i; } + node1.clusterDelSlots(slotsToDelete); node2.clusterDelSlots(slotsToDelete); node3.clusterDelSlots(slotsToDelete); + + clearAnyInconsistentMigration(node1); + clearAnyInconsistentMigration(node2); + clearAnyInconsistentMigration(node3); + } + + private void clearAnyInconsistentMigration(Jedis node) { + // FIXME: it's too slow... apply pipeline if possible + List slots = getInconsistentSlots(node.clusterNodes()); + for (Integer slot : slots) { + node.clusterSetSlotStable(slot); + } } @Test(expected = JedisMovedDataException.class) @@ -112,7 +146,7 @@ public class JedisClusterTest extends Assert { @Test(expected = JedisAskDataException.class) public void testThrowAskException() { int keySlot = JedisClusterCRC16.getSlot("test"); - String node3Id = getNodeId(node3.clusterNodes()); + String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes()); node2.clusterSetSlotMigrating(keySlot, node3Id); node2.get("test"); } @@ -122,7 +156,7 @@ public class JedisClusterTest extends Assert { Set jedisClusterNode = new HashSet(); jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); JedisCluster jc = new JedisCluster(jedisClusterNode); - assertEquals(jc.getClusterNodes().size(), 3); + assertEquals(3, jc.getClusterNodes().size()); } @Test @@ -146,7 +180,7 @@ public class JedisClusterTest extends Assert { node3.clusterDelSlots(slot51); node3.clusterAddSlots(slot51); - waitForClusterReady(); + JedisClusterTestUtil.waitForClusterReady(node1, node2, node3); jc.set("51", "foo"); assertEquals("foo", jc.get("51")); } @@ -157,8 +191,8 @@ public class JedisClusterTest extends Assert { jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); JedisCluster jc = new JedisCluster(jedisClusterNode); int slot51 = JedisClusterCRC16.getSlot("51"); - node3.clusterSetSlotImporting(slot51, getNodeId(node2.clusterNodes())); - node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes())); + node3.clusterSetSlotImporting(slot51, JedisClusterTestUtil.getNodeId(node2.clusterNodes())); + node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes())); jc.set("51", "foo"); assertEquals("foo", jc.get("51")); } @@ -178,7 +212,7 @@ public class JedisClusterTest extends Assert { JedisCluster jc = new JedisCluster(jedisClusterNode); int slot51 = JedisClusterCRC16.getSlot("51"); // This will cause an infinite redirection loop - node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes())); + node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes())); jc.set("51", "foo"); } @@ -190,25 +224,181 @@ public class JedisClusterTest extends Assert { assertEquals(JedisClusterCRC16.getSlot("foo{bar}{zap}"), JedisClusterCRC16.getSlot("bar")); } - private static String getNodeId(String infoOutput) { - for (String infoLine : infoOutput.split("\n")) { - if (infoLine.contains("myself")) { - return infoLine.split(" ")[0]; - } - } - return ""; + @Test + public void testClusterForgetNode() throws InterruptedException { + // at first, join node4 to cluster + node1.clusterMeet("127.0.0.1", nodeInfo4.getPort()); + + String node7Id = JedisClusterTestUtil.getNodeId(node4.clusterNodes()); + + JedisClusterTestUtil.assertNodeIsKnown(node3, node7Id, 1000); + JedisClusterTestUtil.assertNodeIsKnown(node2, node7Id, 1000); + JedisClusterTestUtil.assertNodeIsKnown(node1, node7Id, 1000); + + assertNodeHandshakeEnded(node3, 1000); + assertNodeHandshakeEnded(node2, 1000); + assertNodeHandshakeEnded(node1, 1000); + + assertEquals(4, node1.clusterNodes().split("\n").length); + assertEquals(4, node2.clusterNodes().split("\n").length); + assertEquals(4, node3.clusterNodes().split("\n").length); + + // do cluster forget + node1.clusterForget(node7Id); + node2.clusterForget(node7Id); + node3.clusterForget(node7Id); + + JedisClusterTestUtil.assertNodeIsUnknown(node1, node7Id, 1000); + JedisClusterTestUtil.assertNodeIsUnknown(node2, node7Id, 1000); + JedisClusterTestUtil.assertNodeIsUnknown(node3, node7Id, 1000); + + assertEquals(3, node1.clusterNodes().split("\n").length); + assertEquals(3, node2.clusterNodes().split("\n").length); + assertEquals(3, node3.clusterNodes().split("\n").length); } - - private void waitForClusterReady() throws InterruptedException { - boolean clusterOk = false; - while (!clusterOk) { - if (node1.clusterInfo().split("\n")[0].contains("ok") - && node2.clusterInfo().split("\n")[0].contains("ok") - && node3.clusterInfo().split("\n")[0].contains("ok")) { - clusterOk = true; + + @Test + public void testClusterFlushSlots() { + String slotRange = getNodeServingSlotRange(node1.clusterNodes()); + assertNotNull(slotRange); + + try { + node1.clusterFlushSlots(); + assertNull(getNodeServingSlotRange(node1.clusterNodes())); + } finally { + // rollback + String[] rangeInfo = slotRange.split("-"); + int lower = Integer.parseInt(rangeInfo[0]); + int upper = Integer.parseInt(rangeInfo[1]); + + int[] node1Slots = new int[upper - lower + 1]; + for (int i = 0 ; lower <= upper ; ) { + node1Slots[i++] = lower++; } - Thread.sleep(50); + node1.clusterAddSlots(node1Slots); } } + @Test + public void testClusterKeySlot() { + // It assumes JedisClusterCRC16 is correctly implemented + assertEquals(node1.clusterKeySlot("foo{bar}zap}").intValue(), JedisClusterCRC16.getSlot("foo{bar}zap")); + assertEquals(node1.clusterKeySlot("{user1000}.following").intValue(), JedisClusterCRC16.getSlot("{user1000}.following")); + } + + @Test + public void testClusterCountKeysInSlot() { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort())); + JedisCluster jc = new JedisCluster(jedisClusterNode); + + for (int index = 0 ; index < 5 ; index++) { + jc.set("foo{bar}" + index, "hello"); + } + + int slot = JedisClusterCRC16.getSlot("foo{bar}"); + assertEquals(5, node1.clusterCountKeysInSlot(slot).intValue()); + } + + @Test + public void testStableSlotWhenMigratingNodeOrImportingNodeIsNotSpecified() throws InterruptedException { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort())); + JedisCluster jc = new JedisCluster(jedisClusterNode); + + int slot51 = JedisClusterCRC16.getSlot("51"); + jc.set("51", "foo"); + // node2 is responsible of taking care of slot51 (7186) + + node3.clusterSetSlotImporting(slot51, JedisClusterTestUtil.getNodeId(node2.clusterNodes())); + assertEquals("foo", jc.get("51")); + node3.clusterSetSlotStable(slot51); + assertEquals("foo", jc.get("51")); + + node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes())); + //assertEquals("foo", jc.get("51")); // it leads Max Redirections + node2.clusterSetSlotStable(slot51); + assertEquals("foo", jc.get("51")); + } + + private static String getNodeServingSlotRange(String infoOutput) { + // f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0 1394372400827 0 connected 5461-10922 + for (String infoLine : infoOutput.split("\n")) { + if (infoLine.contains("myself")) { + try { + return infoLine.split(" ")[8]; + } catch (ArrayIndexOutOfBoundsException e) { + return null; + } + } + } + return null; + } + + private List getInconsistentSlots(String infoOuput) { + for (String infoLine : infoOuput.split("\n")) { + if (infoLine.contains("myself")) { + return getSlotsBeingMigrated(infoLine); + } + } + + return null; + } + + private List getSlotsBeingMigrated(String infoLine) { + List inconsistentSlots = new ArrayList(); + + String[] splitted = infoLine.split(" "); + + if (splitted.length > 8) { + for (int index = 8 ; index < splitted.length ; index++) { + String info = splitted[index]; + Integer slot = getSlotFromMigrationInfo(info); + if (slot != null) { + inconsistentSlots.add(slot); + } + } + } + + return inconsistentSlots; + } + + private Integer getSlotFromMigrationInfo(String info) { + if (info.startsWith("[")) { + if (info.contains("-<-")) { + return Integer.parseInt(info.split("-<-")[0].substring(1)); + } else if (info.contains("->-")) { + return Integer.parseInt(info.split("->-")[0].substring(1)); + } + } + + return null; + } + + private void assertNodeHandshakeEnded(Jedis node, int timeoutMs) { + int sleepInterval = 100; + for (int sleepTime = 0 ; sleepTime <= timeoutMs ; sleepTime += sleepInterval) { + boolean isHandshaking = isAnyNodeHandshaking(node); + if (!isHandshaking) + return; + + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e) { + } + } + + throw new JedisException("Node handshaking is not ended"); + } + + private boolean isAnyNodeHandshaking(Jedis node) { + String infoOutput = node.clusterNodes(); + for (String infoLine : infoOutput.split("\n")) { + if (infoLine.contains("handshake")) { + return true; + } + } + return false; + } + } diff --git a/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java b/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java new file mode 100644 index 0000000..c7f7928 --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java @@ -0,0 +1,70 @@ +package redis.clients.jedis.tests.utils; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisException; + +public class JedisClusterTestUtil { + public static void waitForClusterReady(Jedis...nodes) throws InterruptedException { + boolean clusterOk = false; + while (!clusterOk) { + boolean isOk = true; + for (Jedis node : nodes) { + if (!node.clusterInfo().split("\n")[0].contains("ok")) { + isOk = false; + break; + } + } + + if (isOk) { + clusterOk = true; + } + + Thread.sleep(50); + } + } + + public static String getNodeId(String infoOutput) { + for (String infoLine : infoOutput.split("\n")) { + if (infoLine.contains("myself")) { + return infoLine.split(" ")[0]; + } + } + return ""; + } + + public static void assertNodeIsKnown(Jedis node, String targetNodeId, int timeoutMs) { + assertNodeRecognizedStatus(node, targetNodeId, true, timeoutMs); + } + + public static void assertNodeIsUnknown(Jedis node, String targetNodeId, int timeoutMs) { + assertNodeRecognizedStatus(node, targetNodeId, false, timeoutMs); + } + + private static void assertNodeRecognizedStatus(Jedis node, String targetNodeId, boolean shouldRecognized, int timeoutMs) { + int sleepInterval = 100; + for (int sleepTime = 0 ; sleepTime <= timeoutMs ; sleepTime += sleepInterval) { + boolean known = isKnownNode(node, targetNodeId); + if (shouldRecognized == known) + return; + + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e) { + } + } + + throw new JedisException("Node recognize check error"); + } + + private static boolean isKnownNode(Jedis node, String nodeId) { + String infoOutput = node.clusterNodes(); + for (String infoLine : infoOutput.split("\n")) { + if (infoLine.contains(nodeId)) { + return true; + } + } + return false; + } + + +}