diff --git a/src/main/java/redis/clients/jedis/JedisFactory.java b/src/main/java/redis/clients/jedis/JedisFactory.java new file mode 100644 index 0000000..12bef98 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisFactory.java @@ -0,0 +1,78 @@ +package redis.clients.jedis; + +import org.apache.commons.pool.BasePoolableObjectFactory; + +/** + * PoolableObjectFactory custom impl. + */ +class JedisFactory extends BasePoolableObjectFactory { + private final String host; + private final int port; + private final int timeout; + private final String password; + private final int database; + + public JedisFactory(final String host, final int port, + final int timeout, final String password, final int database) { + super(); + this.host = host; + this.port = port; + this.timeout = timeout; + this.password = password; + this.database = database; + } + + public Object makeObject() throws Exception { + final Jedis jedis = new Jedis(this.host, this.port, this.timeout); + + jedis.connect(); + if (null != this.password) { + jedis.auth(this.password); + } + if( database != 0 ) { + jedis.select(database); + } + + return jedis; + } + + @Override + public void activateObject(Object obj) throws Exception { + if (obj instanceof Jedis) { + final Jedis jedis = (Jedis)obj; + if (jedis.getDB() != database) { + jedis.select(database); + } + } + } + + public void destroyObject(final Object obj) throws Exception { + if (obj instanceof Jedis) { + final Jedis jedis = (Jedis) obj; + if (jedis.isConnected()) { + try { + try { + jedis.quit(); + } catch (Exception e) { + } + jedis.disconnect(); + } catch (Exception e) { + + } + } + } + } + + public boolean validateObject(final Object obj) { + if (obj instanceof Jedis) { + final Jedis jedis = (Jedis) obj; + try { + return jedis.isConnected() && jedis.ping().equals("PONG"); + } catch (final Exception e) { + return false; + } + } else { + return false; + } + } +} \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/JedisPool.java b/src/main/java/redis/clients/jedis/JedisPool.java index 1e0ead6..ed4f075 100644 --- a/src/main/java/redis/clients/jedis/JedisPool.java +++ b/src/main/java/redis/clients/jedis/JedisPool.java @@ -2,7 +2,6 @@ package redis.clients.jedis; import java.net.URI; -import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPool.Config; @@ -68,80 +67,5 @@ public class JedisPool extends Pool { public void returnResource(final BinaryJedis resource) { returnResourceObject(resource); - } - - /** - * PoolableObjectFactory custom impl. - */ - private static class JedisFactory extends BasePoolableObjectFactory { - private final String host; - private final int port; - private final int timeout; - private final String password; - private final int database; - - public JedisFactory(final String host, final int port, - final int timeout, final String password, final int database) { - super(); - this.host = host; - this.port = port; - this.timeout = timeout; - this.password = password; - this.database = database; - } - - public Object makeObject() throws Exception { - final Jedis jedis = new Jedis(this.host, this.port, this.timeout); - - jedis.connect(); - if (null != this.password) { - jedis.auth(this.password); - } - if( database != 0 ) { - jedis.select(database); - } - - return jedis; - } - - @Override - public void activateObject(Object obj) throws Exception { - if (obj instanceof Jedis) { - final Jedis jedis = (Jedis)obj; - if (jedis.getDB() != database) { - jedis.select(database); - } - } - } - - public void destroyObject(final Object obj) throws Exception { - if (obj instanceof Jedis) { - final Jedis jedis = (Jedis) obj; - if (jedis.isConnected()) { - try { - try { - jedis.quit(); - } catch (Exception e) { - } - jedis.disconnect(); - } catch (Exception e) { - - } - } - } - } - - public boolean validateObject(final Object obj) { - if (obj instanceof Jedis) { - final Jedis jedis = (Jedis) obj; - try { - return jedis.isConnected() && jedis.ping().equals("PONG"); - } catch (final Exception e) { - return false; - } - } else { - return false; - } - } - } + } } diff --git a/src/main/java/redis/clients/jedis/JedisSentinelPool.java b/src/main/java/redis/clients/jedis/JedisSentinelPool.java new file mode 100644 index 0000000..ecec51e --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisSentinelPool.java @@ -0,0 +1,203 @@ +package redis.clients.jedis; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.logging.Logger; + +import org.apache.commons.pool.impl.GenericObjectPool.Config; + +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.util.Pool; + +public class JedisSentinelPool extends Pool { + + protected Config poolConfig; + + protected int timeout = Protocol.DEFAULT_TIMEOUT; + + protected String password; + + protected int database = Protocol.DEFAULT_DATABASE; + + protected Logger log = Logger.getLogger(getClass().getName()); + + public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelPool(String masterName, Set sentinels) { + this(masterName, sentinels, new Config(), Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, int timeout, final String password) { + this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, final int timeout) { + this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelPool(String masterName, Set sentinels, final Config poolConfig, int timeout, final String password, + final int database) { + this.poolConfig = poolConfig; + HostAndPort master = initSentinels(sentinels, masterName); + initPool(master); + } + + public void returnBrokenResource(final BinaryJedis resource) { + returnBrokenResourceObject(resource); + } + + public void returnResource(final BinaryJedis resource) { + returnResourceObject(resource); + } + + private class HostAndPort { + String host; + int port; + + @Override + public boolean equals(Object obj) { + if (obj instanceof HostAndPort) { + final HostAndPort that = (HostAndPort) obj; + return this.port == that.port && this.host.equals(that.host); + } + return false; + } + + @Override + public String toString() { + return host + ":" + port; + } + } + + public class JedisPubSubAdapter extends JedisPubSub { + + @Override + public void onMessage(String channel, String message) { + + } + + @Override + public void onPMessage(String pattern, String channel, String message) { + + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + + } + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + + } + } + + + private volatile HostAndPort currentHostMaster; + + public void destroy() { + // FIXME clean Sentinel connections + super.destroy(); + } + + public HostAndPort getCurrentHostMaster() { + return currentHostMaster; + } + + private void initPool(HostAndPort master) { + if (!master.equals(currentHostMaster)) { + currentHostMaster = master; + log("Created pool: " + master); + initPool(poolConfig, new JedisFactory(master.host, master.port, timeout, password, database)); + } + } + + private HostAndPort initSentinels(Set sentinels, final String masterName) { + HostAndPort master = null; + final boolean running = true; + outer: while (running) { + for (String sentinel : sentinels) { + final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); + try { + final Jedis jedis = new Jedis(hap.host, hap.port); + if (master == null) { + master = toHostAndPort(jedis.sentinelGetMasterAddrByName(masterName)); + jedis.disconnect(); + break outer; + } + } catch (JedisConnectionException e) { + log("Cannot connect to sentinel running @ " + hap + ". Trying next one."); + } + } + try { + log("All sentinels down, cannot determinate where is " + masterName + " master is running... sleeping 1000ms."); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + log("Got master running at " + master + ". Starting sentinel listeners."); + for (String sentinel : sentinels) { + final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); + + new Thread() { + public void run() { + boolean running = true; + while (running) { + final Jedis jedis = new Jedis(hap.host, hap.port); + try { + jedis.subscribe(new JedisPubSubAdapter() { + @Override + public void onMessage(String channel, String message) { + // System.out.println(channel + ": " + message); + // +switch-master: mymaster 127.0.0.1 6379 127.0.0.1 6380 + log("Sentinel " + hap + " published: " + message + "."); + final String[] switchMasterMsg = message.split(" "); + if (masterName.equals(switchMasterMsg[0])) { + initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); + } + } + }, "+switch-master"); + } catch (JedisConnectionException e) { + log("Lost connection to " + hap + ". Sleeping 5000ms."); + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + }; + }.start(); + } + + return master; + } + + // sophisticated logging + private void log(String msg) { + log.info(msg); + } + + private HostAndPort toHostAndPort(List getMasterAddrByNameResult) { + final HostAndPort hap = new HostAndPort(); + hap.host = getMasterAddrByNameResult.get(0); + hap.port = Integer.parseInt(getMasterAddrByNameResult.get(1)); + return hap; + } + +} diff --git a/src/main/java/redis/clients/util/Pool.java b/src/main/java/redis/clients/util/Pool.java index 522fa01..d227c06 100644 --- a/src/main/java/redis/clients/util/Pool.java +++ b/src/main/java/redis/clients/util/Pool.java @@ -14,14 +14,19 @@ public abstract class Pool { * the internalPool yourself. */ public Pool() { - this.internalPool = null; + this.internalPool = null; } public Pool(final GenericObjectPool.Config poolConfig, PoolableObjectFactory factory) { - this.internalPool = new GenericObjectPool(factory, poolConfig); + initPool(poolConfig, factory); } - + + public void initPool(final GenericObjectPool.Config poolConfig, + PoolableObjectFactory factory) { + this.internalPool = new GenericObjectPool(factory, poolConfig); + } + @SuppressWarnings("unchecked") public T getResource() { try {