Add functionality to recalculate slots when receiving MOVED response from node.

Add test to check for ASK responses (implementation missing)
This commit is contained in:
Marcos Nils
2014-01-02 20:52:17 -03:00
parent b2d22e2060
commit 1b26815799
5 changed files with 94 additions and 24 deletions

View File

@@ -6,6 +6,7 @@ import redis.clients.jedis.exceptions.JedisMovedDataException;
public abstract class JedisClusterCommand<T> { public abstract class JedisClusterCommand<T> {
private JedisClusterConnectionHandler connectionHandler; private JedisClusterConnectionHandler connectionHandler;
private boolean asking = false;
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler) { public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler) {
this.connectionHandler = connectionHandler; this.connectionHandler = connectionHandler;
@@ -17,10 +18,10 @@ public abstract class JedisClusterCommand<T> {
try { try {
return execute(); return execute();
} catch (JedisMovedDataException jme) { } catch (JedisMovedDataException jme) {
//TODO: Do Retry here this.connectionHandler.assignSlotToNode(jme.getSlot(), jme.getTargetNode());
return execute();
} catch (JedisAskDataException jae) { } catch (JedisAskDataException jae) {
//TODO: Do ASK here throw jae;
} }
return null;
} }
} }

View File

@@ -64,4 +64,9 @@ public abstract class JedisClusterConnectionHandler {
String[] arrayHostAndPort = stringHostAndPort.split(":"); String[] arrayHostAndPort = stringHostAndPort.split(":");
return new HostAndPort(arrayHostAndPort[0], Integer.valueOf(arrayHostAndPort[1])); return new HostAndPort(arrayHostAndPort[0], Integer.valueOf(arrayHostAndPort[1]));
} }
public void assignSlotToNode(int slot, HostAndPort targetNode) {
JedisPool targetPool = nodes.get(targetNode.getHost() + targetNode.getPort());
slots.put(slot, targetPool);
}
} }

View File

@@ -80,7 +80,9 @@ public final class Protocol {
//TODO: I'm not sure if this is the best way to do this. //TODO: I'm not sure if this is the best way to do this.
//Maybe Read only first 5 bytes instead? //Maybe Read only first 5 bytes instead?
if (message.contains(MOVED_RESPONSE)) { if (message.contains(MOVED_RESPONSE)) {
throw new JedisMovedDataException(message); String[] movedInfo = message.split(" ");
String[] targetHostAndPort = movedInfo[2].split(":");
throw new JedisMovedDataException(message, new HostAndPort(targetHostAndPort[0], Integer.valueOf(targetHostAndPort[1])), Integer.valueOf(movedInfo[1]));
} else if (message.contains(ASK_RESPONSE)) { } else if (message.contains(ASK_RESPONSE)) {
throw new JedisAskDataException(message); throw new JedisAskDataException(message);
} }

View File

@@ -1,17 +1,37 @@
package redis.clients.jedis.exceptions; package redis.clients.jedis.exceptions;
import redis.clients.jedis.HostAndPort;
public class JedisMovedDataException extends JedisDataException { public class JedisMovedDataException extends JedisDataException {
private static final long serialVersionUID = 3878126572474819403L; private static final long serialVersionUID = 3878126572474819403L;
public JedisMovedDataException(String message) { private HostAndPort targetNode;
private int slot;
public JedisMovedDataException(String message, HostAndPort targetNode, int slot) {
super(message); super(message);
this.targetNode = targetNode;
this.slot = slot;
} }
public JedisMovedDataException(Throwable cause) { public JedisMovedDataException(Throwable cause, HostAndPort targetNode, int slot) {
super(cause); super(cause);
this.targetNode = targetNode;
this.slot = slot;
} }
public JedisMovedDataException(String message, Throwable cause) { public JedisMovedDataException(String message, Throwable cause, HostAndPort targetNode, int slot) {
super(message, cause); super(message, cause);
this.targetNode = targetNode;
this.slot = slot;
}
public HostAndPort getTargetNode() {
return targetNode;
}
public int getSlot() {
return slot;
} }
} }

View File

@@ -65,25 +65,20 @@ public class JedisClusterTest extends Assert {
pipeline3.sync(); pipeline3.sync();
boolean clusterOk = false; waitForClusterReady();
while (!clusterOk) {
if (node1.clusterInfo().split("\n")[0].contains("ok") &&
node2.clusterInfo().split("\n")[0].contains("ok") &&
node3.clusterInfo().split("\n")[0].contains("ok") ) {
clusterOk = true;
}
Thread.sleep(100);
}
} }
@After @After
public void tearDown() { public void tearDown() {
// clear all slots of node1 // clear all slots
Pipeline pipelined = node1.pipelined(); int[] slotsToDelete = new int[JedisCluster.HASHSLOTS];
for (int i = 0; i < JedisCluster.HASHSLOTS; i++) { for (int i = 0; i < JedisCluster.HASHSLOTS; i++) {
pipelined.clusterDelSlots(i); slotsToDelete[i] = i;
} }
pipelined.sync(); node1.clusterDelSlots(slotsToDelete);
node2.clusterDelSlots(slotsToDelete);
node3.clusterDelSlots(slotsToDelete);
} }
@Test(expected=JedisMovedDataException.class) @Test(expected=JedisMovedDataException.class)
@@ -91,6 +86,16 @@ public class JedisClusterTest extends Assert {
node1.set("foo", "bar"); node1.set("foo", "bar");
} }
@Test
public void testMovedExceptionParameters() {
try {
node1.set("foo", "bar");
} catch (JedisMovedDataException jme) {
assertEquals(12182, jme.getSlot());
assertEquals(new HostAndPort("127.0.0.1", 7381), jme.getTargetNode());
}
}
@Test(expected=JedisAskDataException.class) @Test(expected=JedisAskDataException.class)
public void testThrowAskException() { public void testThrowAskException() {
int keySlot = JedisClusterCRC16.getSlot("test"); int keySlot = JedisClusterCRC16.getSlot("test");
@@ -116,7 +121,31 @@ public class JedisClusterTest extends Assert {
jc.set("test", "test"); jc.set("test", "test");
assertEquals("bar",node3.get("foo")); assertEquals("bar",node3.get("foo"));
assertEquals("test",node2.get("test")); assertEquals("test",node2.get("test"));
}
@Test
public void testRecalculateSlotsWhenMoved() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
node2.clusterDelSlots(JedisClusterCRC16.getSlot("51"));
//TODO: We shouldn't need to issue DELSLOTS in node3, but due to redis-cluster bug we need to.
node3.clusterDelSlots(JedisClusterCRC16.getSlot("51"));
node3.clusterAddSlots(JedisClusterCRC16.getSlot("51"));
waitForClusterReady();
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
@Test
public void testAskResponse() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNode);
node3.clusterSetSlotImporting(JedisClusterCRC16.getSlot("51"), getNodeId(node2.clusterNodes()));
node2.clusterSetSlotMigrating(JedisClusterCRC16.getSlot("51"), getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
} }
private String getNodeId(String infoOutput) { private String getNodeId(String infoOutput) {
@@ -127,4 +156,17 @@ public class JedisClusterTest extends Assert {
} }
return ""; return "";
} }
private void waitForClusterReady() throws InterruptedException {
boolean clusterOk = false;
while (!clusterOk) {
if (node1.clusterInfo().split("\n")[0].contains("ok") &&
node2.clusterInfo().split("\n")[0].contains("ok") &&
node3.clusterInfo().split("\n")[0].contains("ok") ) {
clusterOk = true;
}
Thread.sleep(100);
}
}
} }