diff --git a/src/main/java/redis/clients/jedis/JedisClusterCommand.java b/src/main/java/redis/clients/jedis/JedisClusterCommand.java index 1d9aece..0f2b68f 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterCommand.java +++ b/src/main/java/redis/clients/jedis/JedisClusterCommand.java @@ -6,6 +6,7 @@ import redis.clients.jedis.exceptions.JedisMovedDataException; public abstract class JedisClusterCommand { private JedisClusterConnectionHandler connectionHandler; + private boolean asking = false; public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler) { this.connectionHandler = connectionHandler; @@ -17,10 +18,10 @@ public abstract class JedisClusterCommand { try { return execute(); } catch (JedisMovedDataException jme) { - //TODO: Do Retry here + this.connectionHandler.assignSlotToNode(jme.getSlot(), jme.getTargetNode()); + return execute(); } catch (JedisAskDataException jae) { - //TODO: Do ASK here + throw jae; } - return null; } } diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java index 24cf77f..e005ded 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -64,4 +64,9 @@ public abstract class JedisClusterConnectionHandler { String[] arrayHostAndPort = stringHostAndPort.split(":"); return new HostAndPort(arrayHostAndPort[0], Integer.valueOf(arrayHostAndPort[1])); } + + public void assignSlotToNode(int slot, HostAndPort targetNode) { + JedisPool targetPool = nodes.get(targetNode.getHost() + targetNode.getPort()); + slots.put(slot, targetPool); + } } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 7eecd25..bff80dd 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -80,7 +80,9 @@ public final class Protocol { //TODO: I'm not sure if this is the best way to do this. //Maybe Read only first 5 bytes instead? if (message.contains(MOVED_RESPONSE)) { - throw new JedisMovedDataException(message); + String[] movedInfo = message.split(" "); + String[] targetHostAndPort = movedInfo[2].split(":"); + throw new JedisMovedDataException(message, new HostAndPort(targetHostAndPort[0], Integer.valueOf(targetHostAndPort[1])), Integer.valueOf(movedInfo[1])); } else if (message.contains(ASK_RESPONSE)) { throw new JedisAskDataException(message); } diff --git a/src/main/java/redis/clients/jedis/exceptions/JedisMovedDataException.java b/src/main/java/redis/clients/jedis/exceptions/JedisMovedDataException.java index 78e0a4b..c2b5565 100644 --- a/src/main/java/redis/clients/jedis/exceptions/JedisMovedDataException.java +++ b/src/main/java/redis/clients/jedis/exceptions/JedisMovedDataException.java @@ -1,17 +1,37 @@ package redis.clients.jedis.exceptions; +import redis.clients.jedis.HostAndPort; + + public class JedisMovedDataException extends JedisDataException { private static final long serialVersionUID = 3878126572474819403L; + + private HostAndPort targetNode; + private int slot; - public JedisMovedDataException(String message) { - super(message); + public JedisMovedDataException(String message, HostAndPort targetNode, int slot) { + super(message); + this.targetNode = targetNode; + this.slot = slot; } - public JedisMovedDataException(Throwable cause) { + public JedisMovedDataException(Throwable cause, HostAndPort targetNode, int slot) { super(cause); + this.targetNode = targetNode; + this.slot = slot; } - public JedisMovedDataException(String message, Throwable cause) { + public JedisMovedDataException(String message, Throwable cause, HostAndPort targetNode, int slot) { super(message, cause); + this.targetNode = targetNode; + this.slot = slot; } + + public HostAndPort getTargetNode() { + return targetNode; + } + + public int getSlot() { + return slot; + } } diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java index bcd8458..256a807 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java @@ -65,31 +65,36 @@ public class JedisClusterTest extends Assert { pipeline3.sync(); - boolean clusterOk = false; - while (!clusterOk) { - if (node1.clusterInfo().split("\n")[0].contains("ok") && - node2.clusterInfo().split("\n")[0].contains("ok") && - node3.clusterInfo().split("\n")[0].contains("ok") ) { - clusterOk = true; - } - Thread.sleep(100); - } + waitForClusterReady(); } - @After + + @After public void tearDown() { - // clear all slots of node1 - Pipeline pipelined = node1.pipelined(); - for (int i = 0; i < JedisCluster.HASHSLOTS; i++) { - pipelined.clusterDelSlots(i); + // clear all slots + int[] slotsToDelete = new int[JedisCluster.HASHSLOTS]; + for (int i = 0; i < JedisCluster.HASHSLOTS; i++) { + slotsToDelete[i] = i; } - pipelined.sync(); + node1.clusterDelSlots(slotsToDelete); + node2.clusterDelSlots(slotsToDelete); + node3.clusterDelSlots(slotsToDelete); } @Test(expected=JedisMovedDataException.class) public void testThrowMovedException() { node1.set("foo", "bar"); } + + @Test + public void testMovedExceptionParameters() { + try { + node1.set("foo", "bar"); + } catch (JedisMovedDataException jme) { + assertEquals(12182, jme.getSlot()); + assertEquals(new HostAndPort("127.0.0.1", 7381), jme.getTargetNode()); + } + } @Test(expected=JedisAskDataException.class) public void testThrowAskException() { @@ -116,7 +121,31 @@ public class JedisClusterTest extends Assert { jc.set("test", "test"); assertEquals("bar",node3.get("foo")); assertEquals("test",node2.get("test")); - + } + + @Test + public void testRecalculateSlotsWhenMoved() throws InterruptedException { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + JedisCluster jc = new JedisCluster(jedisClusterNode); + node2.clusterDelSlots(JedisClusterCRC16.getSlot("51")); + //TODO: We shouldn't need to issue DELSLOTS in node3, but due to redis-cluster bug we need to. + node3.clusterDelSlots(JedisClusterCRC16.getSlot("51")); + node3.clusterAddSlots(JedisClusterCRC16.getSlot("51")); + waitForClusterReady(); + jc.set("51", "foo"); + assertEquals("foo", jc.get("51")); + } + + @Test + public void testAskResponse() throws InterruptedException { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + JedisCluster jc = new JedisCluster(jedisClusterNode); + node3.clusterSetSlotImporting(JedisClusterCRC16.getSlot("51"), getNodeId(node2.clusterNodes())); + node2.clusterSetSlotMigrating(JedisClusterCRC16.getSlot("51"), getNodeId(node3.clusterNodes())); + jc.set("51", "foo"); + assertEquals("foo", jc.get("51")); } private String getNodeId(String infoOutput) { @@ -127,4 +156,17 @@ public class JedisClusterTest extends Assert { } return ""; } + + private void waitForClusterReady() throws InterruptedException { + boolean clusterOk = false; + while (!clusterOk) { + if (node1.clusterInfo().split("\n")[0].contains("ok") && + node2.clusterInfo().split("\n")[0].contains("ok") && + node3.clusterInfo().split("\n")[0].contains("ok") ) { + clusterOk = true; + } + Thread.sleep(100); + } + } + } \ No newline at end of file