Merge branch 'master' of https://github.com/nrodrigues/jedis into nrodrigues-master
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.pool2.PooledObject;
|
||||
import org.apache.commons.pool2.PooledObjectFactory;
|
||||
import org.apache.commons.pool2.impl.DefaultPooledObject;
|
||||
@@ -8,8 +10,7 @@ import org.apache.commons.pool2.impl.DefaultPooledObject;
|
||||
* PoolableObjectFactory custom impl.
|
||||
*/
|
||||
class JedisFactory implements PooledObjectFactory<Jedis> {
|
||||
private final String host;
|
||||
private final int port;
|
||||
private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>();
|
||||
private final int timeout;
|
||||
private final String password;
|
||||
private final int database;
|
||||
@@ -23,14 +24,17 @@ class JedisFactory implements PooledObjectFactory<Jedis> {
|
||||
public JedisFactory(final String host, final int port, final int timeout,
|
||||
final String password, final int database, final String clientName) {
|
||||
super();
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.hostAndPort.set(new HostAndPort(host, port));
|
||||
this.timeout = timeout;
|
||||
this.password = password;
|
||||
this.database = database;
|
||||
this.clientName = clientName;
|
||||
}
|
||||
|
||||
public void setHostAndPort(final HostAndPort hostAndPort) {
|
||||
this.hostAndPort.set(hostAndPort);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activateObject(PooledObject<Jedis> pooledJedis)
|
||||
throws Exception {
|
||||
@@ -60,7 +64,8 @@ class JedisFactory implements PooledObjectFactory<Jedis> {
|
||||
|
||||
@Override
|
||||
public PooledObject<Jedis> makeObject() throws Exception {
|
||||
final Jedis jedis = new Jedis(this.host, this.port, this.timeout);
|
||||
final HostAndPort hostAndPort = this.hostAndPort.get();
|
||||
final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), this.timeout);
|
||||
|
||||
jedis.connect();
|
||||
if (null != this.password) {
|
||||
@@ -86,7 +91,13 @@ class JedisFactory implements PooledObjectFactory<Jedis> {
|
||||
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
|
||||
final BinaryJedis jedis = pooledJedis.getObject();
|
||||
try {
|
||||
return jedis.isConnected() && jedis.ping().equals("PONG");
|
||||
HostAndPort hostAndPort = this.hostAndPort.get();
|
||||
|
||||
String connectionHost = jedis.getClient().getHost();
|
||||
int connectionPort = jedis.getClient().getPort();
|
||||
|
||||
return hostAndPort.getHost().equals(connectionHost) && hostAndPort.getPort() == connectionPort &&
|
||||
jedis.isConnected() && jedis.ping().equals("PONG");
|
||||
} catch (final Exception e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -65,6 +65,7 @@ public class JedisSentinelPool extends Pool<Jedis> {
|
||||
public JedisSentinelPool(String masterName, Set<String> sentinels,
|
||||
final GenericObjectPoolConfig poolConfig, int timeout,
|
||||
final String password, final int database) {
|
||||
|
||||
this.poolConfig = poolConfig;
|
||||
this.timeout = timeout;
|
||||
this.password = password;
|
||||
@@ -74,6 +75,7 @@ public class JedisSentinelPool extends Pool<Jedis> {
|
||||
initPool(master);
|
||||
}
|
||||
|
||||
private volatile JedisFactory factory;
|
||||
private volatile HostAndPort currentHostMaster;
|
||||
|
||||
public void destroy() {
|
||||
@@ -91,10 +93,18 @@ public class JedisSentinelPool extends Pool<Jedis> {
|
||||
private void initPool(HostAndPort master) {
|
||||
if (!master.equals(currentHostMaster)) {
|
||||
currentHostMaster = master;
|
||||
if (factory == null) {
|
||||
factory = new JedisFactory(master.getHost(), master.getPort(),
|
||||
timeout, password, database);
|
||||
initPool(poolConfig, factory);
|
||||
} else {
|
||||
factory.setHostAndPort(currentHostMaster);
|
||||
// although we clear the pool, we still have to check the returned object
|
||||
// in getResource, this call only clears idle instances, not borrowed instances
|
||||
internalPool.clear();
|
||||
}
|
||||
|
||||
log.info("Created JedisPool to master at " + master);
|
||||
initPool(poolConfig,
|
||||
new JedisFactory(master.getHost(), master.getPort(),
|
||||
timeout, password, database));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,19 +125,23 @@ public class JedisSentinelPool extends Pool<Jedis> {
|
||||
|
||||
log.fine("Connecting to Sentinel " + hap);
|
||||
|
||||
Jedis jedis = null;
|
||||
try {
|
||||
Jedis jedis = new Jedis(hap.getHost(), hap.getPort());
|
||||
jedis = new Jedis(hap.getHost(), hap.getPort());
|
||||
|
||||
if (master == null) {
|
||||
master = toHostAndPort(jedis
|
||||
.sentinelGetMasterAddrByName(masterName));
|
||||
log.fine("Found Redis master at " + master);
|
||||
jedis.disconnect();
|
||||
break outer;
|
||||
}
|
||||
} catch (JedisConnectionException e) {
|
||||
log.warning("Cannot connect to sentinel running @ " + hap
|
||||
+ ". Trying next one.");
|
||||
} finally {
|
||||
if (jedis != null) {
|
||||
jedis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,9 +178,22 @@ public class JedisSentinelPool extends Pool<Jedis> {
|
||||
|
||||
@Override
|
||||
public Jedis getResource() {
|
||||
Jedis jedis = super.getResource();
|
||||
jedis.setDataSource(this);
|
||||
return jedis;
|
||||
while (true) {
|
||||
Jedis jedis = super.getResource();
|
||||
jedis.setDataSource(this);
|
||||
|
||||
// get a reference because it can change concurrently
|
||||
final HostAndPort master = currentHostMaster;
|
||||
final HostAndPort connection = new HostAndPort(jedis.getClient().getHost(),
|
||||
jedis.getClient().getPort());
|
||||
|
||||
if (master.equals(connection)) {
|
||||
// connected to the correct master
|
||||
return jedis;
|
||||
} else {
|
||||
returnBrokenResource(jedis);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void returnBrokenResource(final Jedis resource) {
|
||||
@@ -304,4 +331,4 @@ public class JedisSentinelPool extends Pool<Jedis> {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ public class HostAndPortUtil {
|
||||
sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT));
|
||||
sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 1));
|
||||
sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 2));
|
||||
sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 3));
|
||||
|
||||
clusterHostAndPortList.add(new HostAndPort("localhost", 7379));
|
||||
clusterHostAndPortList.add(new HostAndPort("localhost", 7380));
|
||||
|
||||
@@ -21,18 +21,24 @@ public class JedisSentinelPoolTest extends JedisTestBase {
|
||||
.get(2);
|
||||
protected static HostAndPort slave1 = HostAndPortUtil.getRedisServers()
|
||||
.get(3);
|
||||
|
||||
protected static HostAndPort sentinel1 = HostAndPortUtil
|
||||
.getSentinelServers().get(1);
|
||||
protected static HostAndPort sentinel2 = HostAndPortUtil
|
||||
.getSentinelServers().get(3);
|
||||
|
||||
protected static Jedis sentinelJedis1;
|
||||
protected static Jedis sentinelJedis2;
|
||||
|
||||
protected Set<String> sentinels = new HashSet<String>();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
sentinels.add(sentinel1.toString());
|
||||
sentinels.add(sentinel2.toString());
|
||||
|
||||
sentinelJedis1 = new Jedis(sentinel1.getHost(), sentinel1.getPort());
|
||||
sentinelJedis2 = new Jedis(sentinel2.getHost(), sentinel2.getPort());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -41,6 +47,8 @@ public class JedisSentinelPoolTest extends JedisTestBase {
|
||||
new GenericObjectPoolConfig(), 1000, "foobared", 2);
|
||||
|
||||
forceFailover(pool);
|
||||
// after failover sentinel needs a bit of time to stabilize before a new failover
|
||||
Thread.sleep(100);
|
||||
forceFailover(pool);
|
||||
|
||||
// you can test failover as much as possible
|
||||
@@ -134,29 +142,25 @@ public class JedisSentinelPoolTest extends JedisTestBase {
|
||||
HostAndPort oldMaster = pool.getCurrentHostMaster();
|
||||
|
||||
// jedis connection should be master
|
||||
Jedis jedis = pool.getResource();
|
||||
assertEquals("PONG", jedis.ping());
|
||||
Jedis beforeFailoverJedis = pool.getResource();
|
||||
assertEquals("PONG", beforeFailoverJedis.ping());
|
||||
|
||||
// It can throw JedisDataException while there's no slave to promote
|
||||
// There's nothing we can do, so we just pass Exception to make test
|
||||
// fail fast
|
||||
sentinelJedis1.sentinelFailover(MASTER_NAME);
|
||||
|
||||
waitForFailover(pool, oldMaster);
|
||||
// JedisSentinelPool recognize master but may not changed internal pool
|
||||
// yet
|
||||
Thread.sleep(100);
|
||||
|
||||
Jedis afterFailoverJedis = pool.getResource();
|
||||
assertEquals("PONG", afterFailoverJedis.ping());
|
||||
assertEquals("foobared", afterFailoverJedis.configGet("requirepass").get(1));
|
||||
assertEquals(2, afterFailoverJedis.getDB().intValue());
|
||||
|
||||
jedis = pool.getResource();
|
||||
assertEquals("PONG", jedis.ping());
|
||||
assertEquals("foobared", jedis.configGet("requirepass").get(1));
|
||||
assertEquals(2, jedis.getDB().intValue());
|
||||
// returning both connections to the pool should not throw
|
||||
beforeFailoverJedis.close();
|
||||
afterFailoverJedis.close();
|
||||
}
|
||||
|
||||
private void waitForFailover(JedisSentinelPool pool, HostAndPort oldMaster)
|
||||
throws InterruptedException {
|
||||
HostAndPort newMaster = JedisSentinelTestUtil
|
||||
.waitForNewPromotedMaster(sentinelJedis1);
|
||||
.waitForNewPromotedMaster(MASTER_NAME, sentinelJedis1, sentinelJedis2);
|
||||
|
||||
waitForJedisSentinelPoolRecognizeNewMaster(pool, newMaster);
|
||||
}
|
||||
@@ -166,10 +170,9 @@ public class JedisSentinelPoolTest extends JedisTestBase {
|
||||
throws InterruptedException {
|
||||
|
||||
while (true) {
|
||||
String host = pool.getCurrentHostMaster().getHost();
|
||||
int port = pool.getCurrentHostMaster().getPort();
|
||||
HostAndPort currentHostMaster = pool.getCurrentHostMaster();
|
||||
|
||||
if (host.equals(newMaster.getHost()) && port == newMaster.getPort())
|
||||
if (newMaster.equals(currentHostMaster))
|
||||
break;
|
||||
|
||||
System.out
|
||||
|
||||
@@ -85,16 +85,16 @@ public class JedisSentinelTest extends JedisTestBase {
|
||||
public void sentinelFailover() throws InterruptedException {
|
||||
Jedis j = new Jedis(sentinelForFailover.getHost(),
|
||||
sentinelForFailover.getPort());
|
||||
Jedis j2 = new Jedis(sentinelForFailover.getHost(),
|
||||
sentinelForFailover.getPort());
|
||||
|
||||
try {
|
||||
List<String> masterHostAndPort = j
|
||||
.sentinelGetMasterAddrByName(FAILOVER_MASTER_NAME);
|
||||
HostAndPort currentMaster = new HostAndPort(masterHostAndPort.get(0),
|
||||
Integer.parseInt(masterHostAndPort.get(1)));
|
||||
String result = j.sentinelFailover(FAILOVER_MASTER_NAME);
|
||||
assertEquals("OK", result);
|
||||
|
||||
JedisSentinelTestUtil.waitForNewPromotedMaster(j);
|
||||
JedisSentinelTestUtil.waitForNewPromotedMaster(FAILOVER_MASTER_NAME, j, j2);
|
||||
|
||||
masterHostAndPort = j
|
||||
.sentinelGetMasterAddrByName(FAILOVER_MASTER_NAME);
|
||||
|
||||
@@ -8,7 +8,8 @@ import redis.clients.jedis.JedisPubSub;
|
||||
import redis.clients.jedis.tests.utils.FailoverAbortedException;
|
||||
|
||||
public class JedisSentinelTestUtil {
|
||||
public static HostAndPort waitForNewPromotedMaster(Jedis sentinelJedis)
|
||||
public static HostAndPort waitForNewPromotedMaster(final String masterName,
|
||||
final Jedis sentinelJedis, final Jedis commandJedis)
|
||||
throws InterruptedException {
|
||||
|
||||
final AtomicReference<String> newmaster = new AtomicReference<String>(
|
||||
@@ -47,6 +48,7 @@ public class JedisSentinelTestUtil {
|
||||
|
||||
@Override
|
||||
public void onPSubscribe(String pattern, int subscribedChannels) {
|
||||
commandJedis.sentinelFailover(masterName);
|
||||
}
|
||||
}, "*");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user