Merge pull request #737 from xetorthio/improve_test_stability

Improve test stability
This commit is contained in:
Marcos Nils
2014-09-14 20:50:26 -03:00
11 changed files with 72 additions and 294 deletions

View File

@@ -1,23 +1,26 @@
package redis.clients.jedis;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.util.SafeEncoder;
import static redis.clients.jedis.Protocol.toByteArray;
import static redis.clients.jedis.Protocol.Command.*;
import static redis.clients.jedis.Protocol.Keyword.ENCODING;
import static redis.clients.jedis.Protocol.Keyword.IDLETIME;
import static redis.clients.jedis.Protocol.Keyword.LEN;
import static redis.clients.jedis.Protocol.Keyword.LIMIT;
import static redis.clients.jedis.Protocol.Keyword.NO;
import static redis.clients.jedis.Protocol.Keyword.ONE;
import static redis.clients.jedis.Protocol.Keyword.REFCOUNT;
import static redis.clients.jedis.Protocol.Keyword.RESET;
import static redis.clients.jedis.Protocol.Keyword.STORE;
import static redis.clients.jedis.Protocol.Keyword.WITHSCORES;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import static redis.clients.jedis.Protocol.Command.*;
import static redis.clients.jedis.Protocol.Command.EXISTS;
import static redis.clients.jedis.Protocol.Command.PSUBSCRIBE;
import static redis.clients.jedis.Protocol.Command.PUNSUBSCRIBE;
import static redis.clients.jedis.Protocol.Command.SUBSCRIBE;
import static redis.clients.jedis.Protocol.Command.UNSUBSCRIBE;
import static redis.clients.jedis.Protocol.Keyword.*;
import static redis.clients.jedis.Protocol.toByteArray;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.util.SafeEncoder;
public class BinaryClient extends Connection {
public enum LIST_POSITION {

View File

@@ -8,6 +8,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import redis.clients.jedis.JedisCluster.Reset;
import redis.clients.util.SafeEncoder;
public class Client extends BinaryClient implements Commands {
@@ -911,6 +912,10 @@ public class Client extends BinaryClient implements Commands {
cluster(Protocol.CLUSTER_MEET, ip, String.valueOf(port));
}
public void clusterReset(Reset resetType) {
cluster(Protocol.CLUSTER_RESET, resetType.toString());
}
public void clusterAddSlots(final int... slots) {
cluster(Protocol.CLUSTER_ADDSLOTS, slots);
}

View File

@@ -2,6 +2,8 @@ package redis.clients.jedis;
import java.util.List;
import redis.clients.jedis.JedisCluster.Reset;
public interface ClusterCommands {
String clusterNodes();
@@ -40,4 +42,6 @@ public interface ClusterCommands {
String clusterFailover();
List<Object> clusterSlots();
String clusterReset(Reset resetType);
}

View File

@@ -1,14 +1,22 @@
package redis.clients.jedis;
import java.net.URI;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import redis.clients.jedis.BinaryClient.LIST_POSITION;
import redis.clients.jedis.JedisCluster.Reset;
import redis.clients.util.Pool;
import redis.clients.util.SafeEncoder;
import redis.clients.util.Slowlog;
import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
public class Jedis extends BinaryJedis implements JedisCommands,
MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands,
BasicCommands, ClusterCommands {
@@ -3298,6 +3306,12 @@ public class Jedis extends BinaryJedis implements JedisCommands,
client.clusterMeet(ip, port);
return client.getStatusCodeReply();
}
public String clusterReset(final Reset resetType) {
checkIsInMulti();
client.clusterReset(resetType);
return client.getStatusCodeReply();
}
public String clusterAddSlots(final int... slots) {
checkIsInMulti();

View File

@@ -14,6 +14,8 @@ public class JedisCluster implements JedisCommands, BasicCommands, Closeable {
public static final short HASHSLOTS = 16384;
private static final int DEFAULT_TIMEOUT = 1;
private static final int DEFAULT_MAX_REDIRECTIONS = 5;
public static enum Reset {SOFT, HARD}
private int timeout;
private int maxRedirections;

View File

@@ -42,6 +42,7 @@ public final class Protocol {
public static final String CLUSTER_NODES = "nodes";
public static final String CLUSTER_MEET = "meet";
public static final String CLUSTER_RESET = "reset";
public static final String CLUSTER_ADDSLOTS = "addslots";
public static final String CLUSTER_DELSLOTS = "delslots";
public static final String CLUSTER_INFO = "info";

View File

@@ -31,7 +31,6 @@ public class HostAndPortUtil {
clusterHostAndPortList.add(new HostAndPort("localhost", 7382));
clusterHostAndPortList.add(new HostAndPort("localhost", 7383));
clusterHostAndPortList.add(new HostAndPort("localhost", 7384));
clusterHostAndPortList.add(new HostAndPort("localhost", 7385));
String envRedisHosts = System.getProperty("redis-hosts");
String envSentinelHosts = System.getProperty("sentinel-hosts");

View File

@@ -1,167 +0,0 @@
package redis.clients.jedis.tests;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.tests.utils.JedisClusterTestUtil;
public class JedisClusterReplicateTest {
private static Jedis node5;
private static Jedis node6;
private HostAndPort nodeInfo5 = HostAndPortUtil.getClusterServers().get(4);
private HostAndPort nodeInfo6 = HostAndPortUtil.getClusterServers().get(5);
private static int TIMEOUT = 15000; // cluster-node-timeout * 3
@Before
public void setUp() throws InterruptedException {
node5 = new Jedis(nodeInfo5.getHost(), nodeInfo5.getPort(), TIMEOUT);
node5.connect();
node5.flushAll();
node6 = new Jedis(nodeInfo6.getHost(), nodeInfo6.getPort(), TIMEOUT);
node6.connect();
// cannot flushall - it will be slave
// ---- configure cluster
// add nodes to cluster
node5.clusterMeet("127.0.0.1", nodeInfo6.getPort());
JedisClusterTestUtil.assertNodeIsKnown(node5, JedisClusterTestUtil.getNodeId(node6.clusterNodes()), 1000);
JedisClusterTestUtil.assertNodeIsKnown(node6, JedisClusterTestUtil.getNodeId(node5.clusterNodes()), 1000);
// split available slots across the three nodes
int[] node5Slots = new int[JedisCluster.HASHSLOTS];
for (int i = 0 ; i < JedisCluster.HASHSLOTS; i++) {
node5Slots[i] = i;
}
node5.clusterAddSlots(node5Slots);
JedisClusterTestUtil.waitForClusterReady(node5);
// replicate full 1on1
node6.clusterReplicate(JedisClusterTestUtil.getNodeId(node5
.clusterNodes()));
Map<Jedis, Jedis> replMap = new HashMap<Jedis, Jedis>();
replMap.put(node5, node6);
waitForReplicateReady(replMap, TIMEOUT);
JedisClusterTestUtil.waitForClusterReady(node5, node6);
}
private void waitForReplicateReady(Map<Jedis, Jedis> replMap, int timeoutMs) {
int interval = 100;
for (int timeout = 0; timeout <= timeoutMs; timeout += interval) {
for (Entry<Jedis, Jedis> entry : replMap.entrySet()) {
Jedis master = entry.getKey();
Jedis slave = entry.getValue();
String masterNodeId = JedisClusterTestUtil.getNodeId(master
.clusterNodes());
String slaveNodeId = JedisClusterTestUtil.getNodeId(slave
.clusterNodes());
try {
List<String> slaves = master.clusterSlaves(masterNodeId);
if (slaves.size() > 0 && slaves.get(0).contains(slaveNodeId)) {
return;
}
} catch (JedisDataException e) {
if (!e.getMessage().startsWith("ERR The specified node is not a master"))
throw e;
// retry...
}
}
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
}
}
throw new JedisException("there seems to replication error");
}
@After
public void tearDown() throws InterruptedException {
// clear all slots
int[] slotsToDelete = new int[JedisCluster.HASHSLOTS];
for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
slotsToDelete[i] = i;
}
node5.clusterDelSlots(slotsToDelete);
}
@Test
public void testClusterReplicate() {
// we're already replicate 1on1
List<String> slaveInfos = node5.clusterSlaves(JedisClusterTestUtil
.getNodeId(node5.clusterNodes()));
assertEquals(1, slaveInfos.size());
assertTrue(slaveInfos.get(0).contains(
JedisClusterTestUtil.getNodeId(node6.clusterNodes())));
}
@Test
public void testClusterFailover() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort(nodeInfo5.getHost(), nodeInfo5.getPort()));
JedisCluster jc = new JedisCluster(jedisClusterNode);
jc.set("51", "foo");
// node5 is responsible of taking care of slot for key "51" (7186)
node6.clusterFailover();
try {
// wait for failover
Map<Jedis, Jedis> replMap = new HashMap<Jedis, Jedis>();
replMap.put(node6, node5);
waitForReplicateReady(replMap, TIMEOUT);
JedisClusterTestUtil.waitForClusterReady(node5, node6);
List<String> slaveInfos = node6.clusterSlaves(JedisClusterTestUtil
.getNodeId(node6.clusterNodes()));
assertEquals(1, slaveInfos.size());
assertTrue(slaveInfos.get(0).contains(
JedisClusterTestUtil.getNodeId(node5.clusterNodes())));
} finally {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// rollback
node5.clusterFailover();
Map<Jedis, Jedis> replMap = new HashMap<Jedis, Jedis>();
replMap.put(node5, node6);
waitForReplicateReady(replMap, TIMEOUT);
JedisClusterTestUtil.waitForClusterReady(node5, node6);
}
}
}

View File

@@ -1,9 +1,7 @@
package redis.clients.jedis.tests;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@@ -16,6 +14,7 @@ import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCluster.Reset;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterException;
@@ -85,48 +84,19 @@ public class JedisClusterTest extends Assert {
@AfterClass
public static void cleanUp() {
int slotTest = JedisClusterCRC16.getSlot("test");
int slot51 = JedisClusterCRC16.getSlot("51");
String node1Id = JedisClusterTestUtil.getNodeId(node1.clusterNodes());
String node2Id = JedisClusterTestUtil.getNodeId(node2.clusterNodes());
String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
node2.clusterSetSlotNode(slotTest, node3Id);
node2.clusterSetSlotNode(slot51, node3Id);
node2.clusterDelSlots(slotTest, slot51);
// forget about all nodes
node1.clusterForget(node2Id);
node1.clusterForget(node3Id);
node2.clusterForget(node1Id);
node2.clusterForget(node3Id);
node3.clusterForget(node1Id);
node3.clusterForget(node2Id);
node1.flushDB();
node2.flushDB();
node3.flushDB();
node4.flushDB();
node1.clusterReset(Reset.SOFT);
node2.clusterReset(Reset.SOFT);
node3.clusterReset(Reset.SOFT);
node4.clusterReset(Reset.SOFT);
}
@After
public void tearDown() throws InterruptedException {
// clear all slots
int[] slotsToDelete = new int[JedisCluster.HASHSLOTS];
for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
slotsToDelete[i] = i;
}
node1.clusterDelSlots(slotsToDelete);
node2.clusterDelSlots(slotsToDelete);
node3.clusterDelSlots(slotsToDelete);
clearAnyInconsistentMigration(node1);
clearAnyInconsistentMigration(node2);
clearAnyInconsistentMigration(node3);
}
private void clearAnyInconsistentMigration(Jedis node) {
// FIXME: it's too slow... apply pipeline if possible
List<Integer> slots = getInconsistentSlots(node.clusterNodes());
for (Integer slot : slots) {
node.clusterSetSlotStable(slot);
}
cleanUp();
}
@Test(expected = JedisMovedDataException.class)
@@ -377,46 +347,6 @@ public class JedisClusterTest extends Assert {
return null;
}
private List<Integer> getInconsistentSlots(String infoOuput) {
for (String infoLine : infoOuput.split("\n")) {
if (infoLine.contains("myself")) {
return getSlotsBeingMigrated(infoLine);
}
}
return null;
}
private List<Integer> getSlotsBeingMigrated(String infoLine) {
List<Integer> inconsistentSlots = new ArrayList<Integer>();
String[] splitted = infoLine.split(" ");
if (splitted.length > 8) {
for (int index = 8 ; index < splitted.length ; index++) {
String info = splitted[index];
Integer slot = getSlotFromMigrationInfo(info);
if (slot != null) {
inconsistentSlots.add(slot);
}
}
}
return inconsistentSlots;
}
private Integer getSlotFromMigrationInfo(String info) {
if (info.startsWith("[")) {
if (info.contains("-<-")) {
return Integer.parseInt(info.split("-<-")[0].substring(1));
} else if (info.contains("->-")) {
return Integer.parseInt(info.split("->-")[0].substring(1));
}
}
return null;
}
private void assertNodeHandshakeEnded(Jedis node, int timeoutMs) {
int sleepInterval = 100;
for (int sleepTime = 0 ; sleepTime <= timeoutMs ; sleepTime += sleepInterval) {

View File

@@ -9,9 +9,10 @@ import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.JedisCluster.Reset;
import redis.clients.jedis.tests.HostAndPortUtil;
import redis.clients.jedis.tests.JedisTestBase;
import redis.clients.jedis.tests.utils.JedisClusterTestUtil;
public class ClusterCommandsTest extends JedisTestBase {
private static Jedis node1;
@@ -40,37 +41,24 @@ public class ClusterCommandsTest extends JedisTestBase {
@AfterClass
public static void removeSlots() throws InterruptedException {
String[] nodes = node1.clusterNodes().split("\n");
String node1Id = nodes[0].split(" ")[0];
node1.clusterDelSlots(1, 2, 3, 4, 5, 500);
node1.clusterSetSlotNode(5000, node1Id);
node1.clusterDelSlots(5000, 10000);
node1.clusterDelSlots(3000, 3001, 3002);
node2.clusterDelSlots(4000, 4001, 4002);
node1.clusterAddSlots(6000);
node1.clusterDelSlots(6000);
waitForGossip();
node2.clusterDelSlots(6000);
node1.clusterDelSlots(6000);
node1.clusterReset(Reset.SOFT);
node2.clusterReset(Reset.SOFT);
}
private static void waitForGossip() {
boolean notReady = true;
while (notReady) {
if (node1.clusterNodes().contains("6000")) {
notReady = false;
}
}
@Test
public void testClusterSoftReset() {
node1.clusterMeet("127.0.0.1", nodeInfo2.getPort());
assertTrue(node1.clusterNodes().split("\n").length > 1);
node1.clusterReset(Reset.SOFT);
assertEquals(1, node1.clusterNodes().split("\n").length);
}
private static int getClusterAttribute(String clusterInfo,
String attributeName) {
for (String infoElement : clusterInfo.split("\n")) {
if (infoElement.contains(attributeName)) {
return Integer.valueOf(infoElement.split(":")[1].trim());
}
}
return 0;
@Test
public void testClusterHardReset() {
String nodeId = JedisClusterTestUtil.getNodeId(node1.clusterNodes());
node1.clusterReset(Reset.HARD);
String newNodeId = JedisClusterTestUtil.getNodeId(node1.clusterNodes());
assertNotEquals(nodeId, newNodeId);
}
@Test

View File

@@ -4,7 +4,6 @@ import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;