Manual merge of #581

This commit is contained in:
Marcos Nils
2014-05-25 16:30:41 -03:00
parent d8c3629fde
commit 1782aaeeb1
10 changed files with 634 additions and 36 deletions

View File

@@ -27,6 +27,10 @@ public class HostAndPortUtil {
clusterHostAndPortList.add(new HostAndPort("localhost", 7379));
clusterHostAndPortList.add(new HostAndPort("localhost", 7380));
clusterHostAndPortList.add(new HostAndPort("localhost", 7381));
clusterHostAndPortList.add(new HostAndPort("localhost", 7382));
clusterHostAndPortList.add(new HostAndPort("localhost", 7383));
clusterHostAndPortList.add(new HostAndPort("localhost", 7384));
clusterHostAndPortList.add(new HostAndPort("localhost", 7385));
String envRedisHosts = System.getProperty("redis-hosts");
String envSentinelHosts = System.getProperty("sentinel-hosts");

View File

@@ -0,0 +1,167 @@
package redis.clients.jedis.tests;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.tests.utils.JedisClusterTestUtil;
public class JedisClusterReplicateTest {
private static Jedis node5;
private static Jedis node6;
private HostAndPort nodeInfo5 = HostAndPortUtil.getClusterServers().get(4);
private HostAndPort nodeInfo6 = HostAndPortUtil.getClusterServers().get(5);
private static int TIMEOUT = 15000; // cluster-node-timeout * 3
@Before
public void setUp() throws InterruptedException {
node5 = new Jedis(nodeInfo5.getHost(), nodeInfo5.getPort(), TIMEOUT);
node5.connect();
node5.flushAll();
node6 = new Jedis(nodeInfo6.getHost(), nodeInfo6.getPort(), TIMEOUT);
node6.connect();
// cannot flushall - it will be slave
// ---- configure cluster
// add nodes to cluster
node5.clusterMeet("127.0.0.1", nodeInfo6.getPort());
JedisClusterTestUtil.assertNodeIsKnown(node5, JedisClusterTestUtil.getNodeId(node6.clusterNodes()), 1000);
JedisClusterTestUtil.assertNodeIsKnown(node6, JedisClusterTestUtil.getNodeId(node5.clusterNodes()), 1000);
// split available slots across the three nodes
int[] node5Slots = new int[JedisCluster.HASHSLOTS];
for (int i = 0 ; i < JedisCluster.HASHSLOTS; i++) {
node5Slots[i] = i;
}
node5.clusterAddSlots(node5Slots);
JedisClusterTestUtil.waitForClusterReady(node5);
// replicate full 1on1
node6.clusterReplicate(JedisClusterTestUtil.getNodeId(node5
.clusterNodes()));
Map<Jedis, Jedis> replMap = new HashMap<Jedis, Jedis>();
replMap.put(node5, node6);
waitForReplicateReady(replMap, TIMEOUT);
JedisClusterTestUtil.waitForClusterReady(node5, node6);
}
private void waitForReplicateReady(Map<Jedis, Jedis> replMap, int timeoutMs) {
int interval = 100;
for (int timeout = 0; timeout <= timeoutMs; timeout += interval) {
for (Entry<Jedis, Jedis> entry : replMap.entrySet()) {
Jedis master = entry.getKey();
Jedis slave = entry.getValue();
String masterNodeId = JedisClusterTestUtil.getNodeId(master
.clusterNodes());
String slaveNodeId = JedisClusterTestUtil.getNodeId(slave
.clusterNodes());
try {
List<String> slaves = master.clusterSlaves(masterNodeId);
if (slaves.size() > 0 && slaves.get(0).contains(slaveNodeId)) {
return;
}
} catch (JedisDataException e) {
if (!e.getMessage().startsWith("ERR The specified node is not a master"))
throw e;
// retry...
}
}
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
}
}
throw new JedisException("there seems to replication error");
}
@After
public void tearDown() throws InterruptedException {
// clear all slots
int[] slotsToDelete = new int[JedisCluster.HASHSLOTS];
for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
slotsToDelete[i] = i;
}
node5.clusterDelSlots(slotsToDelete);
}
@Test
public void testClusterReplicate() {
// we're already replicate 1on1
List<String> slaveInfos = node5.clusterSlaves(JedisClusterTestUtil
.getNodeId(node5.clusterNodes()));
assertEquals(1, slaveInfos.size());
assertTrue(slaveInfos.get(0).contains(
JedisClusterTestUtil.getNodeId(node6.clusterNodes())));
}
@Test
public void testClusterFailover() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort(nodeInfo5.getHost(), nodeInfo5.getPort()));
JedisCluster jc = new JedisCluster(jedisClusterNode);
jc.set("51", "foo");
// node5 is responsible of taking care of slot for key "51" (7186)
node6.clusterFailover();
try {
// wait for failover
Map<Jedis, Jedis> replMap = new HashMap<Jedis, Jedis>();
replMap.put(node6, node5);
waitForReplicateReady(replMap, TIMEOUT);
JedisClusterTestUtil.waitForClusterReady(node5, node6);
List<String> slaveInfos = node6.clusterSlaves(JedisClusterTestUtil
.getNodeId(node6.clusterNodes()));
assertEquals(1, slaveInfos.size());
assertTrue(slaveInfos.get(0).contains(
JedisClusterTestUtil.getNodeId(node5.clusterNodes())));
} finally {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// rollback
node5.clusterFailover();
Map<Jedis, Jedis> replMap = new HashMap<Jedis, Jedis>();
replMap.put(node5, node6);
waitForReplicateReady(replMap, TIMEOUT);
JedisClusterTestUtil.waitForClusterReady(node5, node6);
}
}
}

View File

@@ -1,6 +1,8 @@
package redis.clients.jedis.tests;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.After;
@@ -15,17 +17,21 @@ import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.tests.utils.JedisClusterTestUtil;
import redis.clients.util.JedisClusterCRC16;
public class JedisClusterTest extends Assert {
private Jedis node1;
private static Jedis node1;
private static Jedis node2;
private static Jedis node3;
private static Jedis node4;
private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0);
private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1);
private HostAndPort nodeInfo3 = HostAndPortUtil.getClusterServers().get(2);
private HostAndPort nodeInfo4 = HostAndPortUtil.getClusterServers().get(3);
@Before
public void setUp() throws InterruptedException {
@@ -40,6 +46,10 @@ public class JedisClusterTest extends Assert {
node3 = new Jedis(nodeInfo3.getHost(), nodeInfo3.getPort());
node3.connect();
node3.flushAll();
node4 = new Jedis(nodeInfo4.getHost(), nodeInfo4.getPort());
node4.connect();
node4.flushAll();
// ---- configure cluster
@@ -66,29 +76,53 @@ public class JedisClusterTest extends Assert {
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
waitForClusterReady();
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3);
}
@AfterClass
public static void cleanUp() {
int slotTest = JedisClusterCRC16.getSlot("test");
int slot51 = JedisClusterCRC16.getSlot("51");
String node3Id = getNodeId(node3.clusterNodes());
String node1Id = JedisClusterTestUtil.getNodeId(node1.clusterNodes());
String node2Id = JedisClusterTestUtil.getNodeId(node2.clusterNodes());
String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
node2.clusterSetSlotNode(slotTest, node3Id);
node2.clusterSetSlotNode(slot51, node3Id);
node2.clusterDelSlots(slotTest, slot51);
// forget about all nodes
node1.clusterForget(node2Id);
node1.clusterForget(node3Id);
node2.clusterForget(node1Id);
node2.clusterForget(node3Id);
node3.clusterForget(node1Id);
node3.clusterForget(node2Id);
}
@After
public void tearDown() {
public void tearDown() throws InterruptedException {
// clear all slots
int[] slotsToDelete = new int[JedisCluster.HASHSLOTS];
for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
slotsToDelete[i] = i;
}
node1.clusterDelSlots(slotsToDelete);
node2.clusterDelSlots(slotsToDelete);
node3.clusterDelSlots(slotsToDelete);
clearAnyInconsistentMigration(node1);
clearAnyInconsistentMigration(node2);
clearAnyInconsistentMigration(node3);
}
private void clearAnyInconsistentMigration(Jedis node) {
// FIXME: it's too slow... apply pipeline if possible
List<Integer> slots = getInconsistentSlots(node.clusterNodes());
for (Integer slot : slots) {
node.clusterSetSlotStable(slot);
}
}
@Test(expected = JedisMovedDataException.class)
@@ -112,7 +146,7 @@ public class JedisClusterTest extends Assert {
@Test(expected = JedisAskDataException.class)
public void testThrowAskException() {
int keySlot = JedisClusterCRC16.getSlot("test");
String node3Id = getNodeId(node3.clusterNodes());
String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
node2.clusterSetSlotMigrating(keySlot, node3Id);
node2.get("test");
}
@@ -122,7 +156,7 @@ public class JedisClusterTest extends Assert {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
assertEquals(jc.getClusterNodes().size(), 3);
assertEquals(3, jc.getClusterNodes().size());
}
@Test
@@ -146,7 +180,7 @@ public class JedisClusterTest extends Assert {
node3.clusterDelSlots(slot51);
node3.clusterAddSlots(slot51);
waitForClusterReady();
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3);
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
@@ -157,8 +191,8 @@ public class JedisClusterTest extends Assert {
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
int slot51 = JedisClusterCRC16.getSlot("51");
node3.clusterSetSlotImporting(slot51, getNodeId(node2.clusterNodes()));
node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes()));
node3.clusterSetSlotImporting(slot51, JedisClusterTestUtil.getNodeId(node2.clusterNodes()));
node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
@@ -178,7 +212,7 @@ public class JedisClusterTest extends Assert {
JedisCluster jc = new JedisCluster(jedisClusterNode);
int slot51 = JedisClusterCRC16.getSlot("51");
// This will cause an infinite redirection loop
node2.clusterSetSlotMigrating(slot51, getNodeId(node3.clusterNodes()));
node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
}
@@ -190,25 +224,181 @@ public class JedisClusterTest extends Assert {
assertEquals(JedisClusterCRC16.getSlot("foo{bar}{zap}"), JedisClusterCRC16.getSlot("bar"));
}
private static String getNodeId(String infoOutput) {
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains("myself")) {
return infoLine.split(" ")[0];
}
}
return "";
@Test
public void testClusterForgetNode() throws InterruptedException {
// at first, join node4 to cluster
node1.clusterMeet("127.0.0.1", nodeInfo4.getPort());
String node7Id = JedisClusterTestUtil.getNodeId(node4.clusterNodes());
JedisClusterTestUtil.assertNodeIsKnown(node3, node7Id, 1000);
JedisClusterTestUtil.assertNodeIsKnown(node2, node7Id, 1000);
JedisClusterTestUtil.assertNodeIsKnown(node1, node7Id, 1000);
assertNodeHandshakeEnded(node3, 1000);
assertNodeHandshakeEnded(node2, 1000);
assertNodeHandshakeEnded(node1, 1000);
assertEquals(4, node1.clusterNodes().split("\n").length);
assertEquals(4, node2.clusterNodes().split("\n").length);
assertEquals(4, node3.clusterNodes().split("\n").length);
// do cluster forget
node1.clusterForget(node7Id);
node2.clusterForget(node7Id);
node3.clusterForget(node7Id);
JedisClusterTestUtil.assertNodeIsUnknown(node1, node7Id, 1000);
JedisClusterTestUtil.assertNodeIsUnknown(node2, node7Id, 1000);
JedisClusterTestUtil.assertNodeIsUnknown(node3, node7Id, 1000);
assertEquals(3, node1.clusterNodes().split("\n").length);
assertEquals(3, node2.clusterNodes().split("\n").length);
assertEquals(3, node3.clusterNodes().split("\n").length);
}
private void waitForClusterReady() throws InterruptedException {
boolean clusterOk = false;
while (!clusterOk) {
if (node1.clusterInfo().split("\n")[0].contains("ok")
&& node2.clusterInfo().split("\n")[0].contains("ok")
&& node3.clusterInfo().split("\n")[0].contains("ok")) {
clusterOk = true;
@Test
public void testClusterFlushSlots() {
String slotRange = getNodeServingSlotRange(node1.clusterNodes());
assertNotNull(slotRange);
try {
node1.clusterFlushSlots();
assertNull(getNodeServingSlotRange(node1.clusterNodes()));
} finally {
// rollback
String[] rangeInfo = slotRange.split("-");
int lower = Integer.parseInt(rangeInfo[0]);
int upper = Integer.parseInt(rangeInfo[1]);
int[] node1Slots = new int[upper - lower + 1];
for (int i = 0 ; lower <= upper ; ) {
node1Slots[i++] = lower++;
}
Thread.sleep(50);
node1.clusterAddSlots(node1Slots);
}
}
@Test
public void testClusterKeySlot() {
// It assumes JedisClusterCRC16 is correctly implemented
assertEquals(node1.clusterKeySlot("foo{bar}zap}").intValue(), JedisClusterCRC16.getSlot("foo{bar}zap"));
assertEquals(node1.clusterKeySlot("{user1000}.following").intValue(), JedisClusterCRC16.getSlot("{user1000}.following"));
}
@Test
public void testClusterCountKeysInSlot() {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort()));
JedisCluster jc = new JedisCluster(jedisClusterNode);
for (int index = 0 ; index < 5 ; index++) {
jc.set("foo{bar}" + index, "hello");
}
int slot = JedisClusterCRC16.getSlot("foo{bar}");
assertEquals(5, node1.clusterCountKeysInSlot(slot).intValue());
}
@Test
public void testStableSlotWhenMigratingNodeOrImportingNodeIsNotSpecified() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort()));
JedisCluster jc = new JedisCluster(jedisClusterNode);
int slot51 = JedisClusterCRC16.getSlot("51");
jc.set("51", "foo");
// node2 is responsible of taking care of slot51 (7186)
node3.clusterSetSlotImporting(slot51, JedisClusterTestUtil.getNodeId(node2.clusterNodes()));
assertEquals("foo", jc.get("51"));
node3.clusterSetSlotStable(slot51);
assertEquals("foo", jc.get("51"));
node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
//assertEquals("foo", jc.get("51")); // it leads Max Redirections
node2.clusterSetSlotStable(slot51);
assertEquals("foo", jc.get("51"));
}
private static String getNodeServingSlotRange(String infoOutput) {
// f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0 1394372400827 0 connected 5461-10922
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains("myself")) {
try {
return infoLine.split(" ")[8];
} catch (ArrayIndexOutOfBoundsException e) {
return null;
}
}
}
return null;
}
private List<Integer> getInconsistentSlots(String infoOuput) {
for (String infoLine : infoOuput.split("\n")) {
if (infoLine.contains("myself")) {
return getSlotsBeingMigrated(infoLine);
}
}
return null;
}
private List<Integer> getSlotsBeingMigrated(String infoLine) {
List<Integer> inconsistentSlots = new ArrayList<Integer>();
String[] splitted = infoLine.split(" ");
if (splitted.length > 8) {
for (int index = 8 ; index < splitted.length ; index++) {
String info = splitted[index];
Integer slot = getSlotFromMigrationInfo(info);
if (slot != null) {
inconsistentSlots.add(slot);
}
}
}
return inconsistentSlots;
}
private Integer getSlotFromMigrationInfo(String info) {
if (info.startsWith("[")) {
if (info.contains("-<-")) {
return Integer.parseInt(info.split("-<-")[0].substring(1));
} else if (info.contains("->-")) {
return Integer.parseInt(info.split("->-")[0].substring(1));
}
}
return null;
}
private void assertNodeHandshakeEnded(Jedis node, int timeoutMs) {
int sleepInterval = 100;
for (int sleepTime = 0 ; sleepTime <= timeoutMs ; sleepTime += sleepInterval) {
boolean isHandshaking = isAnyNodeHandshaking(node);
if (!isHandshaking)
return;
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
}
}
throw new JedisException("Node handshaking is not ended");
}
private boolean isAnyNodeHandshaking(Jedis node) {
String infoOutput = node.clusterNodes();
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains("handshake")) {
return true;
}
}
return false;
}
}

View File

@@ -0,0 +1,70 @@
package redis.clients.jedis.tests.utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;
public class JedisClusterTestUtil {
public static void waitForClusterReady(Jedis...nodes) throws InterruptedException {
boolean clusterOk = false;
while (!clusterOk) {
boolean isOk = true;
for (Jedis node : nodes) {
if (!node.clusterInfo().split("\n")[0].contains("ok")) {
isOk = false;
break;
}
}
if (isOk) {
clusterOk = true;
}
Thread.sleep(50);
}
}
public static String getNodeId(String infoOutput) {
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains("myself")) {
return infoLine.split(" ")[0];
}
}
return "";
}
public static void assertNodeIsKnown(Jedis node, String targetNodeId, int timeoutMs) {
assertNodeRecognizedStatus(node, targetNodeId, true, timeoutMs);
}
public static void assertNodeIsUnknown(Jedis node, String targetNodeId, int timeoutMs) {
assertNodeRecognizedStatus(node, targetNodeId, false, timeoutMs);
}
private static void assertNodeRecognizedStatus(Jedis node, String targetNodeId, boolean shouldRecognized, int timeoutMs) {
int sleepInterval = 100;
for (int sleepTime = 0 ; sleepTime <= timeoutMs ; sleepTime += sleepInterval) {
boolean known = isKnownNode(node, targetNodeId);
if (shouldRecognized == known)
return;
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
}
}
throw new JedisException("Node recognize check error");
}
private static boolean isKnownNode(Jedis node, String nodeId) {
String infoOutput = node.clusterNodes();
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains(nodeId)) {
return true;
}
}
return false;
}
}