Clean up JedisSentinelTestUtil
* remove all unused methods * move JedisSentinelPoolTest.waitForJedisSentinelPoolRecognizeNewMaster to JedisSentin ** both JedisSentinelTest and JedisSentinelPoolTest can use this implementation * introduce FailoverAbortedException ** throws when we subscribe sentinel channels and got message by "-failover-abort-*" c * respect Source Format to Java Convention (by Eclipse -> Source -> Format)
This commit is contained in:
@@ -2,20 +2,17 @@ package redis.clients.jedis.tests;
|
|||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import redis.clients.jedis.DebugParams;
|
|
||||||
import redis.clients.jedis.HostAndPort;
|
import redis.clients.jedis.HostAndPort;
|
||||||
import redis.clients.jedis.Jedis;
|
import redis.clients.jedis.Jedis;
|
||||||
import redis.clients.jedis.JedisPubSub;
|
|
||||||
import redis.clients.jedis.JedisSentinelPool;
|
import redis.clients.jedis.JedisSentinelPool;
|
||||||
import redis.clients.jedis.Transaction;
|
import redis.clients.jedis.Transaction;
|
||||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||||
import redis.clients.jedis.exceptions.JedisDataException;
|
import redis.clients.jedis.tests.utils.JedisSentinelTestUtil;
|
||||||
|
|
||||||
public class JedisSentinelPoolTest extends JedisTestBase {
|
public class JedisSentinelPoolTest extends JedisTestBase {
|
||||||
private static final String MASTER_NAME = "mymaster";
|
private static final String MASTER_NAME = "mymaster";
|
||||||
@@ -51,7 +48,8 @@ public class JedisSentinelPoolTest extends JedisTestBase {
|
|||||||
// you can test failover as much as possible
|
// you can test failover as much as possible
|
||||||
}
|
}
|
||||||
|
|
||||||
private void forceFailover(JedisSentinelPool pool) throws InterruptedException {
|
private void forceFailover(JedisSentinelPool pool)
|
||||||
|
throws InterruptedException {
|
||||||
HostAndPort oldMaster = pool.getCurrentHostMaster();
|
HostAndPort oldMaster = pool.getCurrentHostMaster();
|
||||||
|
|
||||||
// jedis connection should be master
|
// jedis connection should be master
|
||||||
@@ -59,11 +57,13 @@ public class JedisSentinelPoolTest extends JedisTestBase {
|
|||||||
assertEquals("PONG", jedis.ping());
|
assertEquals("PONG", jedis.ping());
|
||||||
|
|
||||||
// It can throw JedisDataException while there's no slave to promote
|
// It can throw JedisDataException while there's no slave to promote
|
||||||
// There's nothing we can do, so we just pass Exception to make test fail fast
|
// There's nothing we can do, so we just pass Exception to make test
|
||||||
|
// fail fast
|
||||||
sentinelJedis1.sentinelFailover(MASTER_NAME);
|
sentinelJedis1.sentinelFailover(MASTER_NAME);
|
||||||
|
|
||||||
waitForFailover(pool, oldMaster);
|
waitForFailover(pool, oldMaster);
|
||||||
// JedisSentinelPool recognize master but may not changed internal pool yet
|
// JedisSentinelPool recognize master but may not changed internal pool
|
||||||
|
// yet
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
jedis = pool.getResource();
|
jedis = pool.getResource();
|
||||||
@@ -74,54 +74,15 @@ public class JedisSentinelPoolTest extends JedisTestBase {
|
|||||||
|
|
||||||
private void waitForFailover(JedisSentinelPool pool, HostAndPort oldMaster)
|
private void waitForFailover(JedisSentinelPool pool, HostAndPort oldMaster)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
waitForJedisSentinelPoolRecognizeNewMaster(pool);
|
HostAndPort newMaster = JedisSentinelTestUtil
|
||||||
|
.waitForNewPromotedMaster(sentinelJedis1);
|
||||||
|
|
||||||
|
waitForJedisSentinelPoolRecognizeNewMaster(pool, newMaster);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForJedisSentinelPoolRecognizeNewMaster(
|
private void waitForJedisSentinelPoolRecognizeNewMaster(
|
||||||
JedisSentinelPool pool) throws InterruptedException {
|
JedisSentinelPool pool, HostAndPort newMaster)
|
||||||
|
throws InterruptedException {
|
||||||
final AtomicReference<String> newmaster = new AtomicReference<String>(
|
|
||||||
"");
|
|
||||||
|
|
||||||
sentinelJedis1.psubscribe(new JedisPubSub() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(String channel, String message) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onPMessage(String pattern, String channel,
|
|
||||||
String message) {
|
|
||||||
if (channel.equals("+switch-master")) {
|
|
||||||
newmaster.set(message);
|
|
||||||
punsubscribe();
|
|
||||||
} else if (channel.startsWith("-failover-abort")) {
|
|
||||||
punsubscribe();
|
|
||||||
fail("Unfortunately sentinel cannot failover... reason(channel) : " +
|
|
||||||
channel + " / message : " + message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(String channel, int subscribedChannels) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onUnsubscribe(String channel, int subscribedChannels) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onPSubscribe(String pattern, int subscribedChannels) {
|
|
||||||
}
|
|
||||||
}, "*");
|
|
||||||
|
|
||||||
String[] chunks = newmaster.get().split(" ");
|
|
||||||
HostAndPort newMaster = new HostAndPort(chunks[3],
|
|
||||||
Integer.parseInt(chunks[4]));
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
String host = pool.getCurrentHostMaster().getHost();
|
String host = pool.getCurrentHostMaster().getHost();
|
||||||
@@ -146,15 +107,29 @@ public class JedisSentinelPoolTest extends JedisTestBase {
|
|||||||
config, 1000, "foobared", 2);
|
config, 1000, "foobared", 2);
|
||||||
|
|
||||||
Jedis jedis = pool.getResource();
|
Jedis jedis = pool.getResource();
|
||||||
|
Jedis jedis2 = null;
|
||||||
|
|
||||||
|
try {
|
||||||
jedis.set("hello", "jedis");
|
jedis.set("hello", "jedis");
|
||||||
Transaction t = jedis.multi();
|
Transaction t = jedis.multi();
|
||||||
t.set("hello", "world");
|
t.set("hello", "world");
|
||||||
pool.returnResource(jedis);
|
pool.returnResource(jedis);
|
||||||
|
|
||||||
Jedis jedis2 = pool.getResource();
|
jedis2 = pool.getResource();
|
||||||
|
|
||||||
assertTrue(jedis == jedis2);
|
assertTrue(jedis == jedis2);
|
||||||
assertEquals("jedis", jedis2.get("hello"));
|
assertEquals("jedis", jedis2.get("hello"));
|
||||||
|
} catch (JedisConnectionException e) {
|
||||||
|
if (jedis2 != null) {
|
||||||
|
pool.returnBrokenResource(jedis2);
|
||||||
|
jedis2 = null;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (jedis2 != null)
|
||||||
pool.returnResource(jedis2);
|
pool.returnResource(jedis2);
|
||||||
|
|
||||||
pool.destroy();
|
pool.destroy();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,6 +50,8 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
@Test
|
@Test
|
||||||
public void sentinel() {
|
public void sentinel() {
|
||||||
Jedis j = new Jedis(sentinel.getHost(), sentinel.getPort());
|
Jedis j = new Jedis(sentinel.getHost(), sentinel.getPort());
|
||||||
|
|
||||||
|
try {
|
||||||
List<Map<String, String>> masters = j.sentinelMasters();
|
List<Map<String, String>> masters = j.sentinelMasters();
|
||||||
|
|
||||||
boolean inMasters = false;
|
boolean inMasters = false;
|
||||||
@@ -62,8 +64,8 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
List<String> masterHostAndPort = j
|
List<String> masterHostAndPort = j
|
||||||
.sentinelGetMasterAddrByName(MASTER_NAME);
|
.sentinelGetMasterAddrByName(MASTER_NAME);
|
||||||
HostAndPort masterFromSentinel = new HostAndPort(
|
HostAndPort masterFromSentinel = new HostAndPort(
|
||||||
masterHostAndPort.get(0), Integer.parseInt(masterHostAndPort
|
masterHostAndPort.get(0),
|
||||||
.get(1)));
|
Integer.parseInt(masterHostAndPort.get(1)));
|
||||||
assertEquals(master, masterFromSentinel);
|
assertEquals(master, masterFromSentinel);
|
||||||
|
|
||||||
List<Map<String, String>> slaves = j.sentinelSlaves(MASTER_NAME);
|
List<Map<String, String>> slaves = j.sentinelSlaves(MASTER_NAME);
|
||||||
@@ -74,6 +76,9 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
// DO NOT RE-RUN TEST TOO FAST, RESET TAKES SOME TIME TO... RESET
|
// DO NOT RE-RUN TEST TOO FAST, RESET TAKES SOME TIME TO... RESET
|
||||||
assertEquals(Long.valueOf(1), j.sentinelReset(MASTER_NAME));
|
assertEquals(Long.valueOf(1), j.sentinelReset(MASTER_NAME));
|
||||||
assertEquals(Long.valueOf(0), j.sentinelReset("woof" + MASTER_NAME));
|
assertEquals(Long.valueOf(0), j.sentinelReset("woof" + MASTER_NAME));
|
||||||
|
} finally {
|
||||||
|
j.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -81,6 +86,7 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
Jedis j = new Jedis(sentinelForFailover.getHost(),
|
Jedis j = new Jedis(sentinelForFailover.getHost(),
|
||||||
sentinelForFailover.getPort());
|
sentinelForFailover.getPort());
|
||||||
|
|
||||||
|
try {
|
||||||
HostAndPort currentMaster = new HostAndPort(
|
HostAndPort currentMaster = new HostAndPort(
|
||||||
masterForFailover.getHost(), masterForFailover.getPort());
|
masterForFailover.getHost(), masterForFailover.getPort());
|
||||||
|
|
||||||
@@ -89,20 +95,25 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
String result = j.sentinelFailover(FAILOVER_MASTER_NAME);
|
String result = j.sentinelFailover(FAILOVER_MASTER_NAME);
|
||||||
assertEquals("OK", result);
|
assertEquals("OK", result);
|
||||||
|
|
||||||
JedisSentinelTestUtil.waitForNewPromotedMaster(sentinelForFailover,
|
JedisSentinelTestUtil.waitForNewPromotedMaster(j);
|
||||||
FAILOVER_MASTER_NAME, currentMaster);
|
|
||||||
|
|
||||||
masterHostAndPort = j.sentinelGetMasterAddrByName(FAILOVER_MASTER_NAME);
|
masterHostAndPort = j
|
||||||
|
.sentinelGetMasterAddrByName(FAILOVER_MASTER_NAME);
|
||||||
HostAndPort newMaster = new HostAndPort(masterHostAndPort.get(0),
|
HostAndPort newMaster = new HostAndPort(masterHostAndPort.get(0),
|
||||||
Integer.parseInt(masterHostAndPort.get(1)));
|
Integer.parseInt(masterHostAndPort.get(1)));
|
||||||
|
|
||||||
assertNotEquals(newMaster, currentMaster);
|
assertNotEquals(newMaster, currentMaster);
|
||||||
|
} finally {
|
||||||
|
j.close();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void sentinelMonitor() {
|
public void sentinelMonitor() {
|
||||||
Jedis j = new Jedis(sentinel.getHost(), sentinel.getPort());
|
Jedis j = new Jedis(sentinel.getHost(), sentinel.getPort());
|
||||||
|
|
||||||
|
try {
|
||||||
// monitor new master
|
// monitor new master
|
||||||
String result = j.sentinelMonitor(MONITOR_MASTER_NAME, MASTER_IP,
|
String result = j.sentinelMonitor(MONITOR_MASTER_NAME, MASTER_IP,
|
||||||
master.getPort(), 1);
|
master.getPort(), 1);
|
||||||
@@ -110,18 +121,22 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
|
|
||||||
// already monitored
|
// already monitored
|
||||||
try {
|
try {
|
||||||
j.sentinelMonitor(MONITOR_MASTER_NAME, MASTER_IP, master.getPort(),
|
j.sentinelMonitor(MONITOR_MASTER_NAME, MASTER_IP,
|
||||||
1);
|
master.getPort(), 1);
|
||||||
fail();
|
fail();
|
||||||
} catch (JedisDataException e) {
|
} catch (JedisDataException e) {
|
||||||
// pass
|
// pass
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
j.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void sentinelRemove() {
|
public void sentinelRemove() {
|
||||||
Jedis j = new Jedis(sentinel.getHost(), sentinel.getPort());
|
Jedis j = new Jedis(sentinel.getHost(), sentinel.getPort());
|
||||||
|
|
||||||
|
try {
|
||||||
ensureMonitored(sentinel, REMOVE_MASTER_NAME, MASTER_IP,
|
ensureMonitored(sentinel, REMOVE_MASTER_NAME, MASTER_IP,
|
||||||
master.getPort(), 1);
|
master.getPort(), 1);
|
||||||
|
|
||||||
@@ -136,12 +151,16 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
} catch (JedisDataException e) {
|
} catch (JedisDataException e) {
|
||||||
// pass
|
// pass
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
j.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void sentinelSet() {
|
public void sentinelSet() {
|
||||||
Jedis j = new Jedis(sentinel.getHost(), sentinel.getPort());
|
Jedis j = new Jedis(sentinel.getHost(), sentinel.getPort());
|
||||||
|
|
||||||
|
try {
|
||||||
Map<String, String> parameterMap = new HashMap<String, String>();
|
Map<String, String> parameterMap = new HashMap<String, String>();
|
||||||
parameterMap.put("down-after-milliseconds", String.valueOf(1234));
|
parameterMap.put("down-after-milliseconds", String.valueOf(1234));
|
||||||
parameterMap.put("parallel-syncs", String.valueOf(3));
|
parameterMap.put("parallel-syncs", String.valueOf(3));
|
||||||
@@ -151,15 +170,19 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
List<Map<String, String>> masters = j.sentinelMasters();
|
List<Map<String, String>> masters = j.sentinelMasters();
|
||||||
for (Map<String, String> master : masters) {
|
for (Map<String, String> master : masters) {
|
||||||
if (master.get("name").equals(MASTER_NAME)) {
|
if (master.get("name").equals(MASTER_NAME)) {
|
||||||
assertEquals(1234,
|
assertEquals(1234, Integer.parseInt(master
|
||||||
Integer.parseInt(master.get("down-after-milliseconds")));
|
.get("down-after-milliseconds")));
|
||||||
assertEquals(3, Integer.parseInt(master.get("parallel-syncs")));
|
assertEquals(3,
|
||||||
|
Integer.parseInt(master.get("parallel-syncs")));
|
||||||
assertEquals(2, Integer.parseInt(master.get("quorum")));
|
assertEquals(2, Integer.parseInt(master.get("quorum")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
parameterMap.put("quorum", String.valueOf(1));
|
parameterMap.put("quorum", String.valueOf(1));
|
||||||
j.sentinelSet(MASTER_NAME, parameterMap);
|
j.sentinelSet(MASTER_NAME, parameterMap);
|
||||||
|
} finally {
|
||||||
|
j.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ensureMonitored(HostAndPort sentinel, String masterName,
|
private void ensureMonitored(HostAndPort sentinel, String masterName,
|
||||||
@@ -168,6 +191,8 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
try {
|
try {
|
||||||
j.sentinelMonitor(masterName, ip, port, quorum);
|
j.sentinelMonitor(masterName, ip, port, quorum);
|
||||||
} catch (JedisDataException e) {
|
} catch (JedisDataException e) {
|
||||||
|
} finally {
|
||||||
|
j.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -176,6 +201,9 @@ public class JedisSentinelTest extends JedisTestBase {
|
|||||||
try {
|
try {
|
||||||
j.sentinelRemove(masterName);
|
j.sentinelRemove(masterName);
|
||||||
} catch (JedisDataException e) {
|
} catch (JedisDataException e) {
|
||||||
|
} finally {
|
||||||
|
j.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,17 @@
|
|||||||
|
package redis.clients.jedis.tests.utils;
|
||||||
|
|
||||||
|
public class FailoverAbortedException extends RuntimeException {
|
||||||
|
private static final long serialVersionUID = 1925110762858409954L;
|
||||||
|
|
||||||
|
public FailoverAbortedException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FailoverAbortedException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FailoverAbortedException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,131 +1,60 @@
|
|||||||
package redis.clients.jedis.tests.utils;
|
package redis.clients.jedis.tests.utils;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import redis.clients.jedis.HostAndPort;
|
import redis.clients.jedis.HostAndPort;
|
||||||
import redis.clients.jedis.Jedis;
|
import redis.clients.jedis.Jedis;
|
||||||
|
import redis.clients.jedis.JedisPubSub;
|
||||||
|
import redis.clients.jedis.tests.utils.FailoverAbortedException;
|
||||||
|
|
||||||
public class JedisSentinelTestUtil {
|
public class JedisSentinelTestUtil {
|
||||||
|
public static HostAndPort waitForNewPromotedMaster(Jedis sentinelJedis)
|
||||||
public static void waitForSentinelRecognizeRedisReplication(
|
|
||||||
HostAndPort sentinel, String masterName, HostAndPort master,
|
|
||||||
List<HostAndPort> slaves) throws InterruptedException {
|
|
||||||
Jedis sentinelJedis = new Jedis(sentinel.getHost(), sentinel.getPort());
|
|
||||||
while (true) {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
|
|
||||||
if (!isMasterRecognized(sentinelJedis, masterName, master)) {
|
|
||||||
System.out.println("Master not recognized by Sentinel "
|
|
||||||
+ sentinel.getHost() + ":" + sentinel.getPort()
|
|
||||||
+ ", sleep...");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isSlavesRecognized(sentinelJedis, masterName, slaves)) {
|
|
||||||
System.out.println("Slaves not recognized by Sentinel "
|
|
||||||
+ sentinel.getHost() + ":" + sentinel.getPort()
|
|
||||||
+ ", sleep...");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// all recognized
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static HostAndPort waitForNewPromotedMaster(HostAndPort sentinel,
|
|
||||||
String masterName, HostAndPort oldMaster)
|
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
Jedis sentinelJedis = new Jedis(sentinel.getHost(), sentinel.getPort());
|
|
||||||
|
|
||||||
HostAndPort newMaster = null;
|
final AtomicReference<String> newmaster = new AtomicReference<String>(
|
||||||
while (true) {
|
"");
|
||||||
Thread.sleep(1000);
|
|
||||||
|
|
||||||
List<String> sentinelMasterInfos = sentinelJedis
|
sentinelJedis.psubscribe(new JedisPubSub() {
|
||||||
.sentinelGetMasterAddrByName(masterName);
|
|
||||||
if (sentinelMasterInfos == null) {
|
@Override
|
||||||
System.out
|
public void onMessage(String channel, String message) {
|
||||||
.println("Cannot retrieve Sentinel's master address info, sleep...");
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
newMaster = new HostAndPort(sentinelMasterInfos.get(0),
|
@Override
|
||||||
Integer.parseInt(sentinelMasterInfos.get(1)));
|
public void onPMessage(String pattern, String channel,
|
||||||
|
String message) {
|
||||||
if (!newMaster.equals(oldMaster))
|
if (channel.equals("+switch-master")) {
|
||||||
break;
|
newmaster.set(message);
|
||||||
|
punsubscribe();
|
||||||
System.out
|
} else if (channel.startsWith("-failover-abort")) {
|
||||||
.println("Sentinel's master is not yet changed, sleep...");
|
punsubscribe();
|
||||||
|
throw new FailoverAbortedException("Unfortunately sentinel cannot failover... reason(channel) : " +
|
||||||
|
channel + " / message : " + message);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSubscribe(String channel, int subscribedChannels) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onUnsubscribe(String channel, int subscribedChannels) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPUnsubscribe(String pattern, int subscribedChannels) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
||||||
|
}
|
||||||
|
}, "*");
|
||||||
|
|
||||||
|
String[] chunks = newmaster.get().split(" ");
|
||||||
|
HostAndPort newMaster = new HostAndPort(chunks[3],
|
||||||
|
Integer.parseInt(chunks[4]));
|
||||||
|
|
||||||
return newMaster;
|
return newMaster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void waitForSentinelsRecognizeEachOthers()
|
|
||||||
throws InterruptedException {
|
|
||||||
// During failover, master has been changed
|
|
||||||
// It means that sentinels need to recognize other sentinels from new
|
|
||||||
// master's hello channel
|
|
||||||
// Without recognizing, Sentinels cannot run failover
|
|
||||||
|
|
||||||
// Sentinels need to take some time to recognize each other...
|
|
||||||
// http://redis.io/topics/sentinel
|
|
||||||
// Sentinel Rule #8: Every Sentinel publishes a message to every
|
|
||||||
// monitored master
|
|
||||||
// Pub/Sub channel __sentinel__:hello, every five seconds, blabla...
|
|
||||||
|
|
||||||
// FIXME There're no command for sentinel to list recognized sentinels
|
|
||||||
// so sleep wisely (channel's hello message interval + margin)
|
|
||||||
Thread.sleep(5000 + 500);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isMasterRecognized(Jedis sentinelJedis,
|
|
||||||
String masterName, HostAndPort master) {
|
|
||||||
List<String> sentinelMasterInfos = sentinelJedis
|
|
||||||
.sentinelGetMasterAddrByName(masterName);
|
|
||||||
if (sentinelMasterInfos == null)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
HostAndPort sentinelMaster = new HostAndPort(
|
|
||||||
sentinelMasterInfos.get(0),
|
|
||||||
Integer.parseInt(sentinelMasterInfos.get(1)));
|
|
||||||
|
|
||||||
return sentinelMaster.equals(master);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isSlavesRecognized(Jedis sentinelJedis,
|
|
||||||
String masterName, List<HostAndPort> slaves) {
|
|
||||||
List<Map<String, String>> slavesMap = sentinelJedis
|
|
||||||
.sentinelSlaves(masterName);
|
|
||||||
|
|
||||||
if (slavesMap.size() != slaves.size())
|
|
||||||
return false;
|
|
||||||
|
|
||||||
int slavesRecognized = 0;
|
|
||||||
|
|
||||||
for (HostAndPort slave : slaves) {
|
|
||||||
if (isSlaveFoundInSlavesMap(slavesMap, slave))
|
|
||||||
slavesRecognized++;
|
|
||||||
}
|
|
||||||
|
|
||||||
return slavesRecognized == slaves.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isSlaveFoundInSlavesMap(
|
|
||||||
List<Map<String, String>> slavesMap, HostAndPort slave) {
|
|
||||||
for (Map<String, String> slaveMap : slavesMap) {
|
|
||||||
HostAndPort sentinelSlave = new HostAndPort(slaveMap.get("ip"),
|
|
||||||
Integer.parseInt(slaveMap.get("port")));
|
|
||||||
|
|
||||||
if (sentinelSlave.equals(slave))
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user