diff --git a/Makefile b/Makefile index 633842b..5c99721 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,7 @@ endef define REDIS3_CONF daemonize yes port 6381 +requirepass foobared pidfile /tmp/redis3.pid logfile /tmp/redis3.log save "" @@ -30,6 +31,8 @@ endef define REDIS4_CONF daemonize yes port 6382 +requirepass foobared +masterauth foobared pidfile /tmp/redis4.pid logfile /tmp/redis4.log save "" @@ -39,7 +42,7 @@ endef define REDIS_SENTINEL1 port 26379 daemonize yes -sentinel monitor mymaster 127.0.0.1 6379 1 +sentinel monitor mymaster 127.0.0.1 6381 2 sentinel auth-pass mymaster foobared sentinel down-after-milliseconds mymaster 3000 sentinel failover-timeout mymaster 900000 @@ -53,6 +56,7 @@ define REDIS_SENTINEL2 port 26380 daemonize yes sentinel monitor mymaster 127.0.0.1 6381 2 +sentinel auth-pass mymaster foobared sentinel down-after-milliseconds mymaster 3000 sentinel can-failover mymaster yes sentinel parallel-syncs mymaster 1 @@ -65,6 +69,7 @@ define REDIS_SENTINEL3 port 26381 daemonize yes sentinel monitor mymaster 127.0.0.1 6381 2 +sentinel auth-pass mymaster foobared sentinel down-after-milliseconds mymaster 3000 sentinel can-failover mymaster yes sentinel parallel-syncs mymaster 1 diff --git a/src/main/java/redis/clients/jedis/JedisSentinelPool.java b/src/main/java/redis/clients/jedis/JedisSentinelPool.java index 10db508..0d87bdd 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelPool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelPool.java @@ -14,246 +14,299 @@ import redis.clients.util.Pool; public class JedisSentinelPool extends Pool { - protected Config poolConfig; - + protected Config poolConfig; + protected int timeout = Protocol.DEFAULT_TIMEOUT; - + protected String password; - + protected int database = Protocol.DEFAULT_DATABASE; - + protected Set masterListeners = new HashSet(); protected Logger log = Logger.getLogger(getClass().getName()); - - public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig) { - this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE); + + public JedisSentinelPool(String masterName, Set sentinels, + final Config poolConfig) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE); } public JedisSentinelPool(String masterName, Set sentinels) { - this(masterName, sentinels, new Config(), Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE); + this(masterName, sentinels, new Config(), Protocol.DEFAULT_TIMEOUT, + null, Protocol.DEFAULT_DATABASE); } - public JedisSentinelPool(String masterName, Set sentinels, String password) { - this(masterName, sentinels, new Config(), Protocol.DEFAULT_TIMEOUT, password); - } - - public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, int timeout, final String password) { - this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); - } - - public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, final int timeout) { - this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); - } - - public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, final String password) { - this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password); + public JedisSentinelPool(String masterName, Set sentinels, + String password) { + this(masterName, sentinels, new Config(), Protocol.DEFAULT_TIMEOUT, + password); } - public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, int timeout, final String password, - final int database) { - this.poolConfig = poolConfig; - this.timeout = timeout; - this.password = password; - this.database = database; - HostAndPort master = initSentinels(sentinels, masterName); - initPool(master); + public JedisSentinelPool(String masterName, Set sentinels, + final Config poolConfig, int timeout, final String password) { + this(masterName, sentinels, poolConfig, timeout, password, + Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelPool(String masterName, Set sentinels, + final Config poolConfig, final int timeout) { + this(masterName, sentinels, poolConfig, timeout, null, + Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelPool(String masterName, Set sentinels, + final Config poolConfig, final String password) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, + password); + } + + public JedisSentinelPool(String masterName, Set sentinels, + final Config poolConfig, int timeout, final String password, + final int database) { + this.poolConfig = poolConfig; + this.timeout = timeout; + this.password = password; + this.database = database; + + HostAndPort master = initSentinels(sentinels, masterName); + initPool(master); } public void returnBrokenResource(final BinaryJedis resource) { - returnBrokenResourceObject(resource); + returnBrokenResourceObject(resource); } - + public void returnResource(final BinaryJedis resource) { - returnResourceObject(resource); + returnResourceObject(resource); } - - private class HostAndPort { - String host; - int port; - @Override - public boolean equals(Object obj) { - if (obj instanceof HostAndPort) { - HostAndPort hp = (HostAndPort) obj; - return port == hp.port && host.equals(hp.host); - } - return false; - } - - @Override - public String toString() { - return host + ":" + port; - } - } - - private volatile HostAndPort currentHostMaster; + private class HostAndPort { + String host; + int port; - public void destroy() { - for (MasterListener m : masterListeners) { - m.shutdown(); - } - - super.destroy(); - } + @Override + public boolean equals(Object obj) { + if (obj instanceof HostAndPort) { + HostAndPort hp = (HostAndPort) obj; + return port == hp.port && host.equals(hp.host); + } + return false; + } - public HostAndPort getCurrentHostMaster() { - return currentHostMaster; - } + @Override + public String toString() { + return host + ":" + port; + } + } - private void initPool(HostAndPort master) { - if (!master.equals(currentHostMaster)) { - currentHostMaster = master; - log.info("Created JedisPool to master at " + master); - initPool(poolConfig, new JedisFactory(master.host, master.port, timeout, password, database)); - } - } + private volatile HostAndPort currentHostMaster; - private HostAndPort initSentinels(Set sentinels, final String masterName) { - - HostAndPort master = null; - boolean running = true; - - outer: while (running) { - - log.info("Trying to find master from available Sentinels..."); - - for (String sentinel : sentinels) { - - final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); - - log.fine("Connecting to Sentinel " + hap); - - try { - Jedis jedis = new Jedis(hap.host, hap.port); - - if (master == null) { - master = toHostAndPort(jedis.sentinelGetMasterAddrByName(masterName)); - log.fine("Found Redis master at " + master); - jedis.disconnect(); - break outer; - } - } catch (JedisConnectionException e) { - log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one."); - } - } - - try { - log.severe("All sentinels down, cannot determine where is " + masterName + " master is running... sleeping 1000ms."); - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - log.info("Redis master running at " + master + ", starting Sentinel listeners..."); - - for (String sentinel : sentinels) { - final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); - MasterListener masterListener = new MasterListener(masterName, hap.host, hap.port); - masterListeners.add(masterListener); - masterListener.start(); - } - - return master; - } - - private HostAndPort toHostAndPort(List getMasterAddrByNameResult) { - final HostAndPort hap = new HostAndPort(); - hap.host = getMasterAddrByNameResult.get(0); - hap.port = Integer.parseInt(getMasterAddrByNameResult.get(1)); - return hap; - } - - protected class JedisPubSubAdapter extends JedisPubSub { - @Override - public void onMessage(String channel, String message) {} - @Override - public void onPMessage(String pattern, String channel, String message) {} - @Override - public void onPSubscribe(String pattern, int subscribedChannels) {} - @Override - public void onPUnsubscribe(String pattern, int subscribedChannels) {} - @Override - public void onSubscribe(String channel, int subscribedChannels) {} - @Override - public void onUnsubscribe(String channel, int subscribedChannels) {} - } - - protected class MasterListener extends Thread { - - protected String masterName; - protected String host; - protected int port; - protected long subscribeRetryWaitTimeMillis = 5000; - protected Jedis j; - protected AtomicBoolean running = new AtomicBoolean(false); - - protected MasterListener() {} - - public MasterListener(String masterName, String host, int port) { - this.masterName = masterName; - this.host = host; - this.port = port; - } - - public MasterListener(String masterName, String host, int port, long subscribeRetryWaitTimeMillis) { - this(masterName, host, port); - this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; - } - - public void run() { - - running.set(true); - - while (running.get()) { - - j = new Jedis(host, port); - - try { - j.subscribe(new JedisPubSubAdapter() { - @Override - public void onMessage(String channel, String message) { - - log.fine("Sentinel " + host + ":" + port + " published: " + message + "."); - - String[] switchMasterMsg = message.split(" "); - - if (switchMasterMsg.length > 3) { - - if (masterName.equals(switchMasterMsg[0])) { - initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); - } else { - log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + masterName); - } - - } else { - log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel +switch-master: " + message); - } - } - }, "+switch-master"); - - } catch (JedisConnectionException e) { - - if (running.get()) { - log.severe("Lost connection to Sentinel at " + host + ":" + port + ". Sleeping 5000ms and retrying."); - try { Thread.sleep(subscribeRetryWaitTimeMillis); } catch (InterruptedException e1) { e1.printStackTrace(); } - } else { - log.fine("Unsubscribing from Sentinel at " + host + ":" + port); - } - } - } - } - - public void shutdown() { - try { - log.fine("Shutting down listener on " + host + ":" + port); - running.set(false); - // This isn't good, the Jedis object is not thread safe - j.disconnect(); - } catch (Exception e) { - log.severe("Caught exception while shutting down: " + e.getMessage()); - } - } - } + public void destroy() { + for (MasterListener m : masterListeners) { + m.shutdown(); + } + + super.destroy(); + } + + public HostAndPort getCurrentHostMaster() { + return currentHostMaster; + } + + private void initPool(HostAndPort master) { + if (!master.equals(currentHostMaster)) { + currentHostMaster = master; + log.info("Created JedisPool to master at " + master); + initPool(poolConfig, new JedisFactory(master.host, master.port, + timeout, password, database)); + } + } + + private HostAndPort initSentinels(Set sentinels, + final String masterName) { + + HostAndPort master = null; + boolean running = true; + + outer: while (running) { + + log.info("Trying to find master from available Sentinels..."); + + for (String sentinel : sentinels) { + + final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel + .split(":"))); + + log.fine("Connecting to Sentinel " + hap); + + try { + Jedis jedis = new Jedis(hap.host, hap.port); + + if (master == null) { + master = toHostAndPort(jedis + .sentinelGetMasterAddrByName(masterName)); + log.fine("Found Redis master at " + master); + jedis.disconnect(); + break outer; + } + } catch (JedisConnectionException e) { + log.warning("Cannot connect to sentinel running @ " + hap + + ". Trying next one."); + } + } + + try { + log.severe("All sentinels down, cannot determine where is " + + masterName + " master is running... sleeping 1000ms."); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + log.info("Redis master running at " + master + + ", starting Sentinel listeners..."); + + for (String sentinel : sentinels) { + final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel + .split(":"))); + MasterListener masterListener = new MasterListener(masterName, + hap.host, hap.port); + masterListeners.add(masterListener); + masterListener.start(); + } + + return master; + } + + private HostAndPort toHostAndPort(List getMasterAddrByNameResult) { + final HostAndPort hap = new HostAndPort(); + hap.host = getMasterAddrByNameResult.get(0); + hap.port = Integer.parseInt(getMasterAddrByNameResult.get(1)); + return hap; + } + + protected class JedisPubSubAdapter extends JedisPubSub { + @Override + public void onMessage(String channel, String message) { + } + + @Override + public void onPMessage(String pattern, String channel, String message) { + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + } + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + } + } + + protected class MasterListener extends Thread { + + protected String masterName; + protected String host; + protected int port; + protected long subscribeRetryWaitTimeMillis = 5000; + protected Jedis j; + protected AtomicBoolean running = new AtomicBoolean(false); + + protected MasterListener() { + } + + public MasterListener(String masterName, String host, int port) { + this.masterName = masterName; + this.host = host; + this.port = port; + } + + public MasterListener(String masterName, String host, int port, + long subscribeRetryWaitTimeMillis) { + this(masterName, host, port); + this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; + } + + public void run() { + + running.set(true); + + while (running.get()) { + + j = new Jedis(host, port); + + try { + j.subscribe(new JedisPubSubAdapter() { + @Override + public void onMessage(String channel, String message) { + log.fine("Sentinel " + host + ":" + port + + " published: " + message + "."); + + String[] switchMasterMsg = message.split(" "); + + if (switchMasterMsg.length > 3) { + + if (masterName.equals(switchMasterMsg[0])) { + initPool(toHostAndPort(Arrays.asList( + switchMasterMsg[3], + switchMasterMsg[4]))); + } else { + log.fine("Ignoring message on +switch-master for master name " + + switchMasterMsg[0] + + ", our master name is " + + masterName); + } + + } else { + log.severe("Invalid message received on Sentinel " + + host + + ":" + + port + + " on channel +switch-master: " + + message); + } + } + }, "+switch-master"); + + } catch (JedisConnectionException e) { + + if (running.get()) { + log.severe("Lost connection to Sentinel at " + host + + ":" + port + + ". Sleeping 5000ms and retrying."); + try { + Thread.sleep(subscribeRetryWaitTimeMillis); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } else { + log.fine("Unsubscribing from Sentinel at " + host + ":" + + port); + } + } + } + } + + public void shutdown() { + try { + log.fine("Shutting down listener on " + host + ":" + port); + running.set(false); + // This isn't good, the Jedis object is not thread safe + j.disconnect(); + } catch (Exception e) { + log.severe("Caught exception while shutting down: " + + e.getMessage()); + } + } + } } \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java b/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java index 5c8affe..80b04b2 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisSentinelPoolTest.java @@ -3,6 +3,7 @@ package redis.clients.jedis.tests; import java.util.HashSet; import java.util.Set; +import org.apache.commons.pool.impl.GenericObjectPool.Config; import org.junit.Before; import org.junit.Test; @@ -12,53 +13,63 @@ import redis.clients.jedis.JedisSentinelPool; import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort; public class JedisSentinelPoolTest extends JedisTestBase { - - protected static HostAndPort master = HostAndPortUtil.getRedisServers().get(2); - protected static HostAndPort slave1 = HostAndPortUtil.getRedisServers().get(3); - protected static HostAndPort sentinel1 = HostAndPortUtil.getSentinelServers().get(1); - protected static HostAndPort sentinel2 = HostAndPortUtil.getSentinelServers().get(2); - protected static Jedis masterJedis; - protected static Jedis slaveJedis1; - protected static Jedis sentinelJedis1; - - protected Set sentinels = new HashSet(); - - @Before - public void setUp() throws Exception { - - // set up master and slaves - masterJedis = new Jedis(master.host, master.port); - masterJedis.slaveofNoOne(); - - slaveJedis1 = new Jedis(slave1.host, slave1.port); - slaveJedis1.slaveof(master.host, master.port); - - sentinelJedis1 = new Jedis(sentinel1.host, sentinel1.port); - sentinels.add(sentinel1.toString()); - sentinels.add(sentinel2.toString()); - - // FIXME: The following allows the master/slave relationship to - // be established. We can do this more elegantly. - Thread.sleep(10000); + protected static HostAndPort master = HostAndPortUtil.getRedisServers() + .get(2); + protected static HostAndPort slave1 = HostAndPortUtil.getRedisServers() + .get(3); + protected static HostAndPort sentinel1 = HostAndPortUtil + .getSentinelServers().get(1); + protected static HostAndPort sentinel2 = HostAndPortUtil + .getSentinelServers().get(2); + + protected static Jedis masterJedis; + protected static Jedis slaveJedis1; + + protected Set sentinels = new HashSet(); + + @Before + public void setUp() throws Exception { + + // set up master and slaves + masterJedis = new Jedis(master.host, master.port); + masterJedis.auth("foobared"); + masterJedis.slaveofNoOne(); + + slaveJedis1 = new Jedis(slave1.host, slave1.port); + slaveJedis1.auth("foobared"); + slaveJedis1.slaveof(master.host, master.port); + + sentinels.add(sentinel1.toString()); + sentinels.add(sentinel2.toString()); + + // FIXME: The following allows the master/slave relationship to + // be established. We can do this more elegantly. + Thread.sleep(10000); } - + @Test public void segfaultMaster() throws InterruptedException { - - JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels); - - Jedis jedis = pool.getResource(); - assertEquals("PONG", jedis.ping()); - - try { masterJedis.debug(DebugParams.SEGFAULT()); } catch (Exception e) {} - - // wait for the sentinel to promote a master - // FIXME: we can query the sentinel and sleep - // right until the master is promoted - Thread.sleep(35000); - - jedis = pool.getResource(); - assertEquals("PONG", jedis.ping()); + + JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, + new Config(), 1000, "foobared", 2); + + Jedis jedis = pool.getResource(); + assertEquals("PONG", jedis.ping()); + + try { + masterJedis.debug(DebugParams.SEGFAULT()); + } catch (Exception e) { + } + + // wait for the sentinel to promote a master + // FIXME: we can query the sentinel and sleep + // right until the master is promoted + Thread.sleep(35000); + + jedis = pool.getResource(); + assertEquals("PONG", jedis.ping()); + assertEquals("foobared", jedis.configGet("requirepass").get(1)); + assertEquals(2, jedis.getDB().intValue()); } }