ASK Redirection don't update slot->node

This commit is contained in:
zhanghailei
2014-09-26 10:06:04 +08:00
parent 7836531ad7
commit 1578cae9fc
4 changed files with 144 additions and 12 deletions

View File

@@ -13,6 +13,7 @@ public abstract class JedisClusterCommand<T> {
private JedisClusterConnectionHandler connectionHandler;
private int commandTimeout;
private int redirections;
private ThreadLocal<Jedis> askConnection = new ThreadLocal<Jedis>();
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler,
int timeout, int maxRedirections) {
@@ -41,20 +42,23 @@ public abstract class JedisClusterCommand<T> {
Jedis connection = null;
try {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler
.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
connection.asking();
// if asking success, reset asking flag
asking = false;
}else{
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler
.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
}
return execute(connection);
@@ -72,8 +76,7 @@ public abstract class JedisClusterCommand<T> {
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisAskDataException) {
asking = true;
this.connectionHandler.assignSlotToNode(jre.getSlot(),
jre.getTargetNode());
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache
// recommended by Redis cluster specification
@@ -102,4 +105,4 @@ public abstract class JedisClusterCommand<T> {
}
}
}
}

View File

@@ -26,6 +26,12 @@ public abstract class JedisClusterConnectionHandler {
abstract Jedis getConnectionFromSlot(int slot);
public Jedis getConnectionFromNode(HostAndPort node) {
cache.setNodeIfNotExist(node);
return cache.getNode(JedisClusterInfoCache.getNodeKey(node))
.getResource();
}
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig) {
this.cache = new JedisClusterInfoCache(poolConfig);
initializeSlotsCache(nodes, poolConfig);

View File

@@ -5,6 +5,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.logging.Logger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -39,11 +40,13 @@ public class JedisClusterTest extends Assert {
private static Jedis node2;
private static Jedis node3;
private static Jedis node4;
private String localHost = "127.0.0.1";
private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0);
private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1);
private HostAndPort nodeInfo3 = HostAndPortUtil.getClusterServers().get(2);
private HostAndPort nodeInfo4 = HostAndPortUtil.getClusterServers().get(3);
protected Logger log = Logger.getLogger(getClass().getName());
@Before
public void setUp() throws InterruptedException {
@@ -66,9 +69,9 @@ public class JedisClusterTest extends Assert {
// ---- configure cluster
// add nodes to cluster
node1.clusterMeet("127.0.0.1", nodeInfo2.getPort());
node1.clusterMeet("127.0.0.1", nodeInfo3.getPort());
node1.clusterMeet(localHost, nodeInfo2.getPort());
node1.clusterMeet(localHost, nodeInfo3.getPort());
// split available slots across the three nodes
int slotsPerNode = JedisCluster.HASHSLOTS / 3;
int[] node1Slots = new int[slotsPerNode];
@@ -152,6 +155,115 @@ public class JedisClusterTest extends Assert {
assertEquals("bar", node3.get("foo"));
assertEquals("test", node2.get("test"));
}
/**
* slot->nodes
* 15363 node3 e
*/
@Test
public void testMigrate(){
log.info("test migrate slot");
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(nodeInfo1);
JedisCluster jc = new JedisCluster(jedisClusterNode);
String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
String node2Id = JedisClusterTestUtil.getNodeId(node2.clusterNodes());
node3.clusterSetSlotMigrating(15363, node2Id);
node2.clusterSetSlotImporting(15363, node3Id);
try{
node2.set("e", "e");
}catch(JedisMovedDataException jme){
assertEquals(15363, jme.getSlot());
assertEquals(new HostAndPort(localHost, nodeInfo3.getPort()), jme.getTargetNode());
}
try{
node3.set("e", "e");
}catch(JedisAskDataException jae){
assertEquals(15363, jae.getSlot());
assertEquals(new HostAndPort(localHost, nodeInfo2.getPort()), jae.getTargetNode());
}
jc.set("e", "e");
try{
node2.get("e");
}catch(JedisMovedDataException jme){
assertEquals(15363, jme.getSlot());
assertEquals(new HostAndPort(localHost, nodeInfo3.getPort()), jme.getTargetNode());
}
try{
node3.get("e");
}catch(JedisAskDataException jae){
assertEquals(15363, jae.getSlot());
assertEquals(new HostAndPort(localHost, nodeInfo2.getPort()), jae.getTargetNode());
}
assertEquals("e", jc.get("e"));
node2.clusterSetSlotNode(15363, node2Id);
node3.clusterSetSlotNode(15363, node2Id);
//assertEquals("e", jc.get("e"));
assertEquals("e", node2.get("e"));
//assertEquals("e", node3.get("e"));
}
@Test
public void testMigrateToNewNode() throws InterruptedException{
log.info("test migrate slot to new node");
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(nodeInfo1);
JedisCluster jc = new JedisCluster(jedisClusterNode);
node4.clusterMeet(localHost, nodeInfo1.getPort());
String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
String node4Id = JedisClusterTestUtil.getNodeId(node4.clusterNodes());
JedisClusterTestUtil.waitForClusterReady(node4);
node3.clusterSetSlotMigrating(15363, node4Id);
node4.clusterSetSlotImporting(15363, node3Id);
try{
node4.set("e", "e");
}catch(JedisMovedDataException jme){
assertEquals(15363, jme.getSlot());
assertEquals(new HostAndPort(localHost, nodeInfo3.getPort()), jme.getTargetNode());
}
try{
node3.set("e", "e");
}catch(JedisAskDataException jae){
assertEquals(15363, jae.getSlot());
assertEquals(new HostAndPort(localHost, nodeInfo4.getPort()), jae.getTargetNode());
}
jc.set("e", "e");
try{
node4.get("e");
}catch(JedisMovedDataException jme){
assertEquals(15363, jme.getSlot());
assertEquals(new HostAndPort(localHost, nodeInfo3.getPort()), jme.getTargetNode());
}
try{
node3.get("e");
}catch(JedisAskDataException jae){
assertEquals(15363, jae.getSlot());
assertEquals(new HostAndPort(localHost, nodeInfo4.getPort()), jae.getTargetNode());
}
assertEquals("e", jc.get("e"));
node4.clusterSetSlotNode(15363, node4Id);
node3.clusterSetSlotNode(15363, node4Id);
//assertEquals("e", jc.get("e"));
assertEquals("e", node4.get("e"));
//assertEquals("e", node3.get("e"));
}
@Test
public void testRecalculateSlotsWhenMoved() throws InterruptedException {

View File

@@ -1,5 +1,6 @@
package redis.clients.jedis.tests.utils;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;
@@ -32,6 +33,16 @@ public class JedisClusterTestUtil {
return "";
}
public static String getNodeId(String infoOutput,HostAndPort node){
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains(node.toString())) {
return infoLine.split(" ")[0];
}
}
return "";
}
public static void assertNodeIsKnown(Jedis node, String targetNodeId, int timeoutMs) {
assertNodeRecognizedStatus(node, targetNodeId, true, timeoutMs);
}