diff --git a/src/main/java/redis/clients/jedis/JedisSentinelPool.java b/src/main/java/redis/clients/jedis/JedisSentinelPool.java index ecec51e..9bee0e9 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelPool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelPool.java @@ -1,8 +1,10 @@ package redis.clients.jedis; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import org.apache.commons.pool.impl.GenericObjectPool.Config; @@ -19,6 +21,8 @@ public class JedisSentinelPool extends Pool { protected String password; protected int database = Protocol.DEFAULT_DATABASE; + + protected Set masterListeners = new HashSet(); protected Logger log = Logger.getLogger(getClass().getName()); @@ -28,7 +32,11 @@ public class JedisSentinelPool extends Pool { public JedisSentinelPool(String masterName, Set sentinels) { this(masterName, sentinels, new Config(), Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE); - } + } + + public JedisSentinelPool(String masterName, Set sentinels, String password) { + this(masterName, sentinels, new Config(), Protocol.DEFAULT_TIMEOUT, password); + } public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, int timeout, final String password) { this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); @@ -37,6 +45,10 @@ public class JedisSentinelPool extends Pool { public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, final int timeout) { this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); } + + public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, final String password) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password); + } public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, int timeout, final String password, final int database) { @@ -53,15 +65,15 @@ public class JedisSentinelPool extends Pool { returnResourceObject(resource); } - private class HostAndPort { + private class HostAndPort { String host; int port; @Override public boolean equals(Object obj) { if (obj instanceof HostAndPort) { - final HostAndPort that = (HostAndPort) obj; - return this.port == that.port && this.host.equals(that.host); + HostAndPort hp = (HostAndPort) obj; + return port == hp.port && host.equals(hp.host); } return false; } @@ -71,45 +83,14 @@ public class JedisSentinelPool extends Pool { return host + ":" + port; } } - - public class JedisPubSubAdapter extends JedisPubSub { - - @Override - public void onMessage(String channel, String message) { - - } - - @Override - public void onPMessage(String pattern, String channel, String message) { - - } - - @Override - public void onPSubscribe(String pattern, int subscribedChannels) { - - } - - @Override - public void onPUnsubscribe(String pattern, int subscribedChannels) { - - } - - @Override - public void onSubscribe(String channel, int subscribedChannels) { - - } - - @Override - public void onUnsubscribe(String channel, int subscribedChannels) { - - } - } - - + private volatile HostAndPort currentHostMaster; public void destroy() { - // FIXME clean Sentinel connections + for (MasterListener m : masterListeners) { + m.shutdown(); + } + super.destroy(); } @@ -118,86 +99,154 @@ public class JedisSentinelPool extends Pool { } private void initPool(HostAndPort master) { - if (!master.equals(currentHostMaster)) { - currentHostMaster = master; - log("Created pool: " + master); - initPool(poolConfig, new JedisFactory(master.host, master.port, timeout, password, database)); - } + if (!master.equals(currentHostMaster)) { + currentHostMaster = master; + log.info("Created JedisPool to master at " + master); + initPool(poolConfig, new JedisFactory(master.host, master.port, timeout, password, database)); + } } private HostAndPort initSentinels(Set sentinels, final String masterName) { - HostAndPort master = null; - final boolean running = true; - outer: while (running) { - for (String sentinel : sentinels) { - final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); - try { - final Jedis jedis = new Jedis(hap.host, hap.port); - if (master == null) { - master = toHostAndPort(jedis.sentinelGetMasterAddrByName(masterName)); - jedis.disconnect(); - break outer; - } - } catch (JedisConnectionException e) { - log("Cannot connect to sentinel running @ " + hap + ". Trying next one."); - } - } - try { - log("All sentinels down, cannot determinate where is " + masterName + " master is running... sleeping 1000ms."); - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - log("Got master running at " + master + ". Starting sentinel listeners."); - for (String sentinel : sentinels) { - final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); - - new Thread() { - public void run() { - boolean running = true; - while (running) { - final Jedis jedis = new Jedis(hap.host, hap.port); - try { - jedis.subscribe(new JedisPubSubAdapter() { - @Override - public void onMessage(String channel, String message) { - // System.out.println(channel + ": " + message); - // +switch-master: mymaster 127.0.0.1 6379 127.0.0.1 6380 - log("Sentinel " + hap + " published: " + message + "."); - final String[] switchMasterMsg = message.split(" "); - if (masterName.equals(switchMasterMsg[0])) { - initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); - } - } - }, "+switch-master"); - } catch (JedisConnectionException e) { - log("Lost connection to " + hap + ". Sleeping 5000ms."); - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } - } - }; - }.start(); - } - - return master; + + HostAndPort master = null; + boolean running = true; + + outer: while (running) { + + log.info("Trying to find master from available Sentinels..."); + + for (String sentinel : sentinels) { + + final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); + + log.fine("Connecting to Sentinel " + hap); + + try { + Jedis jedis = new Jedis(hap.host, hap.port); + + if (master == null) { + master = toHostAndPort(jedis.sentinelGetMasterAddrByName(masterName)); + log.fine("Found Redis master at " + master); + jedis.disconnect(); + break outer; + } + } catch (JedisConnectionException e) { + log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one."); + } + } + + try { + log.severe("All sentinels down, cannot determine where is " + masterName + " master is running... sleeping 1000ms."); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + log.info("Redis master running at " + master + ", starting Sentinel listeners..."); + + for (String sentinel : sentinels) { + final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); + MasterListener masterListener = new MasterListener(masterName, hap.host, hap.port); + masterListeners.add(masterListener); + masterListener.start(); + } + + return master; } - - // sophisticated logging - private void log(String msg) { - log.info(msg); - } - + private HostAndPort toHostAndPort(List getMasterAddrByNameResult) { final HostAndPort hap = new HostAndPort(); hap.host = getMasterAddrByNameResult.get(0); hap.port = Integer.parseInt(getMasterAddrByNameResult.get(1)); return hap; } - -} + + protected class JedisPubSubAdapter extends JedisPubSub { + @Override + public void onMessage(String channel, String message) {} + @Override + public void onPMessage(String pattern, String channel, String message) {} + @Override + public void onPSubscribe(String pattern, int subscribedChannels) {} + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) {} + @Override + public void onSubscribe(String channel, int subscribedChannels) {} + @Override + public void onUnsubscribe(String channel, int subscribedChannels) {} + } + + protected class MasterListener extends Thread { + + protected String masterName; + protected String host; + protected int port; + protected long subscribeRetryWaitTimeMillis = 5000; + protected Jedis j; + protected AtomicBoolean running = new AtomicBoolean(false); + + protected MasterListener() {} + + public MasterListener(String masterName, String host, int port) { + this.masterName = masterName; + this.host = host; + this.port = port; + } + + public MasterListener(String masterName, String host, int port, long subscribeRetryWaitTimeMillis) { + this(masterName, host, port); + this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; + } + + public void run() { + + running.set(true); + + while (running.get()) { + + j = new Jedis(host, port); + + try { + j.subscribe(new JedisPubSubAdapter() { + @Override + public void onMessage(String channel, String message) { + + log.fine("Sentinel " + host + ":" + port + " published: " + message + "."); + + String[] switchMasterMsg = message.split(" "); + + if (switchMasterMsg.length > 3) { + + if (masterName.equals(switchMasterMsg[0])) { + initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); + } else { + log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + masterName); + } + + } else { + log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel +switch-master: " + message); + } + } + }, "+switch-master"); + + } catch (JedisConnectionException e) { + log.severe("Lost connection to Sentinel at " + host + ":" + port + ". Sleeping 5000ms and retrying."); + + try { Thread.sleep(subscribeRetryWaitTimeMillis); } catch (InterruptedException e1) { e1.printStackTrace(); } + } + } + } + + public void shutdown() { + try { + log.fine("Shutting down listener on " + host + ":" + port); + running.set(false); + // This isn't good, the Jedis object is not thread safe + j.disconnect(); + } catch (Exception e) { + log.severe("Caught exception while shutting down: " + e.getMessage()); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/redis/clients/util/Pool.java b/src/main/java/redis/clients/util/Pool.java index d227c06..4ad6fff 100644 --- a/src/main/java/redis/clients/util/Pool.java +++ b/src/main/java/redis/clients/util/Pool.java @@ -11,19 +11,24 @@ public abstract class Pool { /** * Using this constructor means you have to set - * the internalPool yourself. + * and initialize the internalPool yourself. */ - public Pool() { - this.internalPool = null; - } + public Pool() {} public Pool(final GenericObjectPool.Config poolConfig, PoolableObjectFactory factory) { initPool(poolConfig, factory); } - public void initPool(final GenericObjectPool.Config poolConfig, - PoolableObjectFactory factory) { + public void initPool(final GenericObjectPool.Config poolConfig, PoolableObjectFactory factory) { + + if (this.internalPool != null) { + try { + destroy(); + } catch (Exception e) { + } + } + this.internalPool = new GenericObjectPool(factory, poolConfig); }