diff --git a/Makefile b/Makefile index 6a784d8..51d8a37 100644 --- a/Makefile +++ b/Makefile @@ -116,6 +116,18 @@ pidfile /tmp/sentinel3.pid logfile /tmp/sentinel3.log endef +define REDIS_SENTINEL4 +port 26382 +daemonize yes +sentinel monitor mymaster 127.0.0.1 6381 1 +sentinel auth-pass mymaster foobared +sentinel down-after-milliseconds mymaster 2000 +sentinel parallel-syncs mymaster 1 +sentinel failover-timeout mymaster 120000 +pidfile /tmp/sentinel4.pid +logfile /tmp/sentinel4.log +endef + # CLUSTER REDIS NODES define REDIS_CLUSTER_NODE1_CONF daemonize yes @@ -199,6 +211,7 @@ export REDIS7_CONF export REDIS_SENTINEL1 export REDIS_SENTINEL2 export REDIS_SENTINEL3 +export REDIS_SENTINEL4 export REDIS_CLUSTER_NODE1_CONF export REDIS_CLUSTER_NODE2_CONF export REDIS_CLUSTER_NODE3_CONF @@ -219,6 +232,8 @@ start: cleanup echo "$$REDIS_SENTINEL2" > /tmp/sentinel2.conf && redis-server /tmp/sentinel2.conf --sentinel @sleep 0.5 echo "$$REDIS_SENTINEL3" > /tmp/sentinel3.conf && redis-server /tmp/sentinel3.conf --sentinel + @sleep 0.5 + echo "$$REDIS_SENTINEL4" > /tmp/sentinel4.conf && redis-server /tmp/sentinel4.conf --sentinel echo "$$REDIS_CLUSTER_NODE1_CONF" | redis-server - echo "$$REDIS_CLUSTER_NODE2_CONF" | redis-server - echo "$$REDIS_CLUSTER_NODE3_CONF" | redis-server - @@ -241,6 +256,7 @@ stop: kill `cat /tmp/sentinel1.pid` kill `cat /tmp/sentinel2.pid` kill `cat /tmp/sentinel3.pid` + kill `cat /tmp/sentinel4.pid` kill `cat /tmp/redis_cluster_node1.pid` || true kill `cat /tmp/redis_cluster_node2.pid` || true kill `cat /tmp/redis_cluster_node3.pid` || true diff --git a/src/main/java/redis/clients/jedis/JedisFactory.java b/src/main/java/redis/clients/jedis/JedisFactory.java index 3597d69..c04b43d 100644 --- a/src/main/java/redis/clients/jedis/JedisFactory.java +++ b/src/main/java/redis/clients/jedis/JedisFactory.java @@ -1,5 +1,7 @@ package redis.clients.jedis; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; @@ -8,8 +10,7 @@ import org.apache.commons.pool2.impl.DefaultPooledObject; * PoolableObjectFactory custom impl. */ class JedisFactory implements PooledObjectFactory { - private final String host; - private final int port; + private final AtomicReference hostAndPort = new AtomicReference(); private final int timeout; private final String password; private final int database; @@ -23,14 +24,17 @@ class JedisFactory implements PooledObjectFactory { public JedisFactory(final String host, final int port, final int timeout, final String password, final int database, final String clientName) { super(); - this.host = host; - this.port = port; + this.hostAndPort.set(new HostAndPort(host, port)); this.timeout = timeout; this.password = password; this.database = database; this.clientName = clientName; } + public void setHostAndPort(final HostAndPort hostAndPort) { + this.hostAndPort.set(hostAndPort); + } + @Override public void activateObject(PooledObject pooledJedis) throws Exception { @@ -60,7 +64,8 @@ class JedisFactory implements PooledObjectFactory { @Override public PooledObject makeObject() throws Exception { - final Jedis jedis = new Jedis(this.host, this.port, this.timeout); + final HostAndPort hostAndPort = this.hostAndPort.get(); + final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), this.timeout); jedis.connect(); if (null != this.password) { @@ -86,7 +91,13 @@ class JedisFactory implements PooledObjectFactory { public boolean validateObject(PooledObject pooledJedis) { final BinaryJedis jedis = pooledJedis.getObject(); try { - return jedis.isConnected() && jedis.ping().equals("PONG"); + HostAndPort hostAndPort = this.hostAndPort.get(); + + String connectionHost = jedis.getClient().getHost(); + int connectionPort = jedis.getClient().getPort(); + + return hostAndPort.getHost().equals(connectionHost) && hostAndPort.getPort() == connectionPort && + jedis.isConnected() && jedis.ping().equals("PONG"); } catch (final Exception e) { return false; } diff --git a/src/main/java/redis/clients/jedis/JedisSentinelPool.java b/src/main/java/redis/clients/jedis/JedisSentinelPool.java index 0ff0dff..84afe80 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelPool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelPool.java @@ -65,6 +65,7 @@ public class JedisSentinelPool extends Pool { public JedisSentinelPool(String masterName, Set sentinels, final GenericObjectPoolConfig poolConfig, int timeout, final String password, final int database) { + this.poolConfig = poolConfig; this.timeout = timeout; this.password = password; @@ -74,6 +75,7 @@ public class JedisSentinelPool extends Pool { initPool(master); } + private volatile JedisFactory factory; private volatile HostAndPort currentHostMaster; public void destroy() { @@ -91,10 +93,18 @@ public class JedisSentinelPool extends Pool { private void initPool(HostAndPort master) { if (!master.equals(currentHostMaster)) { currentHostMaster = master; + if (factory == null) { + factory = new JedisFactory(master.getHost(), master.getPort(), + timeout, password, database); + initPool(poolConfig, factory); + } else { + factory.setHostAndPort(currentHostMaster); + // although we clear the pool, we still have to check the returned object + // in getResource, this call only clears idle instances, not borrowed instances + internalPool.clear(); + } + log.info("Created JedisPool to master at " + master); - initPool(poolConfig, - new JedisFactory(master.getHost(), master.getPort(), - timeout, password, database)); } } @@ -115,19 +125,23 @@ public class JedisSentinelPool extends Pool { log.fine("Connecting to Sentinel " + hap); + Jedis jedis = null; try { - Jedis jedis = new Jedis(hap.getHost(), hap.getPort()); + jedis = new Jedis(hap.getHost(), hap.getPort()); if (master == null) { master = toHostAndPort(jedis .sentinelGetMasterAddrByName(masterName)); log.fine("Found Redis master at " + master); - jedis.disconnect(); break outer; } } catch (JedisConnectionException e) { log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one."); + } finally { + if (jedis != null) { + jedis.close(); + } } } @@ -164,9 +178,22 @@ public class JedisSentinelPool extends Pool { @Override public Jedis getResource() { - Jedis jedis = super.getResource(); - jedis.setDataSource(this); - return jedis; + while (true) { + Jedis jedis = super.getResource(); + jedis.setDataSource(this); + + // get a reference because it can change concurrently + final HostAndPort master = currentHostMaster; + final HostAndPort connection = new HostAndPort(jedis.getClient().getHost(), + jedis.getClient().getPort()); + + if (master.equals(connection)) { + // connected to the correct master + return jedis; + } else { + returnBrokenResource(jedis); + } + } } public void returnBrokenResource(final Jedis resource) { @@ -304,4 +331,4 @@ public class JedisSentinelPool extends Pool { } } } -} \ No newline at end of file +} diff --git a/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java b/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java index b2c9cf0..e27b0c7 100644 --- a/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java +++ b/src/test/java/redis/clients/jedis/tests/HostAndPortUtil.java @@ -23,6 +23,7 @@ public class HostAndPortUtil { sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT)); sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 1)); sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 2)); + sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 3)); clusterHostAndPortList.add(new HostAndPort("localhost", 7379)); clusterHostAndPortList.add(new HostAndPort("localhost", 7380)); diff --git a/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java b/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java index 205d90a..5a43b3f 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java @@ -21,18 +21,24 @@ public class JedisSentinelPoolTest extends JedisTestBase { .get(2); protected static HostAndPort slave1 = HostAndPortUtil.getRedisServers() .get(3); + protected static HostAndPort sentinel1 = HostAndPortUtil .getSentinelServers().get(1); + protected static HostAndPort sentinel2 = HostAndPortUtil + .getSentinelServers().get(3); protected static Jedis sentinelJedis1; + protected static Jedis sentinelJedis2; protected Set sentinels = new HashSet(); @Before public void setUp() throws Exception { sentinels.add(sentinel1.toString()); + sentinels.add(sentinel2.toString()); sentinelJedis1 = new Jedis(sentinel1.getHost(), sentinel1.getPort()); + sentinelJedis2 = new Jedis(sentinel2.getHost(), sentinel2.getPort()); } @Test @@ -41,6 +47,8 @@ public class JedisSentinelPoolTest extends JedisTestBase { new GenericObjectPoolConfig(), 1000, "foobared", 2); forceFailover(pool); + // after failover sentinel needs a bit of time to stabilize before a new failover + Thread.sleep(100); forceFailover(pool); // you can test failover as much as possible @@ -134,29 +142,25 @@ public class JedisSentinelPoolTest extends JedisTestBase { HostAndPort oldMaster = pool.getCurrentHostMaster(); // jedis connection should be master - Jedis jedis = pool.getResource(); - assertEquals("PONG", jedis.ping()); + Jedis beforeFailoverJedis = pool.getResource(); + assertEquals("PONG", beforeFailoverJedis.ping()); - // It can throw JedisDataException while there's no slave to promote - // There's nothing we can do, so we just pass Exception to make test - // fail fast - sentinelJedis1.sentinelFailover(MASTER_NAME); - waitForFailover(pool, oldMaster); - // JedisSentinelPool recognize master but may not changed internal pool - // yet - Thread.sleep(100); + + Jedis afterFailoverJedis = pool.getResource(); + assertEquals("PONG", afterFailoverJedis.ping()); + assertEquals("foobared", afterFailoverJedis.configGet("requirepass").get(1)); + assertEquals(2, afterFailoverJedis.getDB().intValue()); - jedis = pool.getResource(); - assertEquals("PONG", jedis.ping()); - assertEquals("foobared", jedis.configGet("requirepass").get(1)); - assertEquals(2, jedis.getDB().intValue()); + // returning both connections to the pool should not throw + beforeFailoverJedis.close(); + afterFailoverJedis.close(); } private void waitForFailover(JedisSentinelPool pool, HostAndPort oldMaster) throws InterruptedException { HostAndPort newMaster = JedisSentinelTestUtil - .waitForNewPromotedMaster(sentinelJedis1); + .waitForNewPromotedMaster(MASTER_NAME, sentinelJedis1, sentinelJedis2); waitForJedisSentinelPoolRecognizeNewMaster(pool, newMaster); } @@ -166,10 +170,9 @@ public class JedisSentinelPoolTest extends JedisTestBase { throws InterruptedException { while (true) { - String host = pool.getCurrentHostMaster().getHost(); - int port = pool.getCurrentHostMaster().getPort(); + HostAndPort currentHostMaster = pool.getCurrentHostMaster(); - if (host.equals(newMaster.getHost()) && port == newMaster.getPort()) + if (newMaster.equals(currentHostMaster)) break; System.out diff --git a/src/test/java/redis/clients/jedis/tests/JedisSentinelTest.java b/src/test/java/redis/clients/jedis/tests/JedisSentinelTest.java index 822c659..349506b 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisSentinelTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisSentinelTest.java @@ -85,16 +85,16 @@ public class JedisSentinelTest extends JedisTestBase { public void sentinelFailover() throws InterruptedException { Jedis j = new Jedis(sentinelForFailover.getHost(), sentinelForFailover.getPort()); + Jedis j2 = new Jedis(sentinelForFailover.getHost(), + sentinelForFailover.getPort()); try { List masterHostAndPort = j .sentinelGetMasterAddrByName(FAILOVER_MASTER_NAME); HostAndPort currentMaster = new HostAndPort(masterHostAndPort.get(0), Integer.parseInt(masterHostAndPort.get(1))); - String result = j.sentinelFailover(FAILOVER_MASTER_NAME); - assertEquals("OK", result); - JedisSentinelTestUtil.waitForNewPromotedMaster(j); + JedisSentinelTestUtil.waitForNewPromotedMaster(FAILOVER_MASTER_NAME, j, j2); masterHostAndPort = j .sentinelGetMasterAddrByName(FAILOVER_MASTER_NAME); diff --git a/src/test/java/redis/clients/jedis/tests/utils/JedisSentinelTestUtil.java b/src/test/java/redis/clients/jedis/tests/utils/JedisSentinelTestUtil.java index dcb9334..a162ce2 100644 --- a/src/test/java/redis/clients/jedis/tests/utils/JedisSentinelTestUtil.java +++ b/src/test/java/redis/clients/jedis/tests/utils/JedisSentinelTestUtil.java @@ -8,7 +8,8 @@ import redis.clients.jedis.JedisPubSub; import redis.clients.jedis.tests.utils.FailoverAbortedException; public class JedisSentinelTestUtil { - public static HostAndPort waitForNewPromotedMaster(Jedis sentinelJedis) + public static HostAndPort waitForNewPromotedMaster(final String masterName, + final Jedis sentinelJedis, final Jedis commandJedis) throws InterruptedException { final AtomicReference newmaster = new AtomicReference( @@ -47,6 +48,7 @@ public class JedisSentinelTestUtil { @Override public void onPSubscribe(String pattern, int subscribedChannels) { + commandJedis.sentinelFailover(masterName); } }, "*");