diff --git a/src/main/java/redis/clients/jedis/JedisClusterCommand.java b/src/main/java/redis/clients/jedis/JedisClusterCommand.java index 604afca..69dc12c 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterCommand.java +++ b/src/main/java/redis/clients/jedis/JedisClusterCommand.java @@ -13,6 +13,7 @@ public abstract class JedisClusterCommand { private JedisClusterConnectionHandler connectionHandler; private int commandTimeout; private int redirections; + private ThreadLocal askConnection = new ThreadLocal(); public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int timeout, int maxRedirections) { @@ -41,20 +42,23 @@ public abstract class JedisClusterCommand { Jedis connection = null; try { - if (tryRandomNode) { - connection = connectionHandler.getConnection(); - } else { - connection = connectionHandler - .getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); - } + if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... + connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; + }else{ + if (tryRandomNode) { + connection = connectionHandler.getConnection(); + } else { + connection = connectionHandler + .getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); + } } return execute(connection); @@ -72,8 +76,7 @@ public abstract class JedisClusterCommand { } catch (JedisRedirectionException jre) { if (jre instanceof JedisAskDataException) { asking = true; - this.connectionHandler.assignSlotToNode(jre.getSlot(), - jre.getTargetNode()); + askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification @@ -102,4 +105,4 @@ public abstract class JedisClusterCommand { } } -} \ No newline at end of file +} diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java index eb25337..9b30c19 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -26,6 +26,12 @@ public abstract class JedisClusterConnectionHandler { abstract Jedis getConnectionFromSlot(int slot); + public Jedis getConnectionFromNode(HostAndPort node) { + cache.setNodeIfNotExist(node); + return cache.getNode(JedisClusterInfoCache.getNodeKey(node)) + .getResource(); + } + public JedisClusterConnectionHandler(Set nodes, final GenericObjectPoolConfig poolConfig) { this.cache = new JedisClusterInfoCache(poolConfig); initializeSlotsCache(nodes, poolConfig); diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java index 4bc4f97..950ce59 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java @@ -5,6 +5,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.logging.Logger; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -39,11 +40,13 @@ public class JedisClusterTest extends Assert { private static Jedis node2; private static Jedis node3; private static Jedis node4; + private String localHost = "127.0.0.1"; private HostAndPort nodeInfo1 = HostAndPortUtil.getClusterServers().get(0); private HostAndPort nodeInfo2 = HostAndPortUtil.getClusterServers().get(1); private HostAndPort nodeInfo3 = HostAndPortUtil.getClusterServers().get(2); private HostAndPort nodeInfo4 = HostAndPortUtil.getClusterServers().get(3); + protected Logger log = Logger.getLogger(getClass().getName()); @Before public void setUp() throws InterruptedException { @@ -66,9 +69,9 @@ public class JedisClusterTest extends Assert { // ---- configure cluster // add nodes to cluster - node1.clusterMeet("127.0.0.1", nodeInfo2.getPort()); - node1.clusterMeet("127.0.0.1", nodeInfo3.getPort()); - + node1.clusterMeet(localHost, nodeInfo2.getPort()); + node1.clusterMeet(localHost, nodeInfo3.getPort()); + // split available slots across the three nodes int slotsPerNode = JedisCluster.HASHSLOTS / 3; int[] node1Slots = new int[slotsPerNode]; @@ -152,6 +155,115 @@ public class JedisClusterTest extends Assert { assertEquals("bar", node3.get("foo")); assertEquals("test", node2.get("test")); } + + /** + * slot->nodes + * 15363 node3 e + */ + @Test + public void testMigrate(){ + log.info("test migrate slot"); + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(nodeInfo1); + JedisCluster jc = new JedisCluster(jedisClusterNode); + String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes()); + String node2Id = JedisClusterTestUtil.getNodeId(node2.clusterNodes()); + node3.clusterSetSlotMigrating(15363, node2Id); + node2.clusterSetSlotImporting(15363, node3Id); + try{ + node2.set("e", "e"); + }catch(JedisMovedDataException jme){ + assertEquals(15363, jme.getSlot()); + assertEquals(new HostAndPort(localHost, nodeInfo3.getPort()), jme.getTargetNode()); + } + + try{ + node3.set("e", "e"); + }catch(JedisAskDataException jae){ + assertEquals(15363, jae.getSlot()); + assertEquals(new HostAndPort(localHost, nodeInfo2.getPort()), jae.getTargetNode()); + } + + jc.set("e", "e"); + + try{ + node2.get("e"); + }catch(JedisMovedDataException jme){ + assertEquals(15363, jme.getSlot()); + assertEquals(new HostAndPort(localHost, nodeInfo3.getPort()), jme.getTargetNode()); + } + try{ + node3.get("e"); + }catch(JedisAskDataException jae){ + assertEquals(15363, jae.getSlot()); + assertEquals(new HostAndPort(localHost, nodeInfo2.getPort()), jae.getTargetNode()); + } + + assertEquals("e", jc.get("e")); + + node2.clusterSetSlotNode(15363, node2Id); + node3.clusterSetSlotNode(15363, node2Id); + //assertEquals("e", jc.get("e")); + assertEquals("e", node2.get("e")); + + //assertEquals("e", node3.get("e")); + + + + } + + @Test + public void testMigrateToNewNode() throws InterruptedException{ + log.info("test migrate slot to new node"); + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(nodeInfo1); + JedisCluster jc = new JedisCluster(jedisClusterNode); + node4.clusterMeet(localHost, nodeInfo1.getPort()); + + String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes()); + String node4Id = JedisClusterTestUtil.getNodeId(node4.clusterNodes()); + JedisClusterTestUtil.waitForClusterReady(node4); + node3.clusterSetSlotMigrating(15363, node4Id); + node4.clusterSetSlotImporting(15363, node3Id); + try{ + node4.set("e", "e"); + }catch(JedisMovedDataException jme){ + assertEquals(15363, jme.getSlot()); + assertEquals(new HostAndPort(localHost, nodeInfo3.getPort()), jme.getTargetNode()); + } + + try{ + node3.set("e", "e"); + }catch(JedisAskDataException jae){ + assertEquals(15363, jae.getSlot()); + assertEquals(new HostAndPort(localHost, nodeInfo4.getPort()), jae.getTargetNode()); + } + + jc.set("e", "e"); + + try{ + node4.get("e"); + }catch(JedisMovedDataException jme){ + assertEquals(15363, jme.getSlot()); + assertEquals(new HostAndPort(localHost, nodeInfo3.getPort()), jme.getTargetNode()); + } + try{ + node3.get("e"); + }catch(JedisAskDataException jae){ + assertEquals(15363, jae.getSlot()); + assertEquals(new HostAndPort(localHost, nodeInfo4.getPort()), jae.getTargetNode()); + } + + assertEquals("e", jc.get("e")); + + node4.clusterSetSlotNode(15363, node4Id); + node3.clusterSetSlotNode(15363, node4Id); + //assertEquals("e", jc.get("e")); + assertEquals("e", node4.get("e")); + + //assertEquals("e", node3.get("e")); + + } @Test public void testRecalculateSlotsWhenMoved() throws InterruptedException { diff --git a/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java b/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java index c7f7928..d5f9344 100644 --- a/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java +++ b/src/test/java/redis/clients/jedis/tests/utils/JedisClusterTestUtil.java @@ -1,5 +1,6 @@ package redis.clients.jedis.tests.utils; +import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.exceptions.JedisException; @@ -32,6 +33,16 @@ public class JedisClusterTestUtil { return ""; } + public static String getNodeId(String infoOutput,HostAndPort node){ + + for (String infoLine : infoOutput.split("\n")) { + if (infoLine.contains(node.toString())) { + return infoLine.split(" ")[0]; + } + } + return ""; + } + public static void assertNodeIsKnown(Jedis node, String targetNodeId, int timeoutMs) { assertNodeRecognizedStatus(node, targetNodeId, true, timeoutMs); }