Merge pull request #146 from TioBorracho/MasterSlaveConsistency2
Give names to shards for backward compatibility
This commit is contained in:
@@ -61,24 +61,6 @@ public class Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Object read() {
|
|
||||||
try {
|
|
||||||
return protocol.read(inputStream);
|
|
||||||
} catch (JedisConnectionException e) {
|
|
||||||
disconnect();
|
|
||||||
throw new JedisConnectionException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void sendProtocolCommand(final Command cmd, final byte[]... args) {
|
|
||||||
try {
|
|
||||||
protocol.sendCommand(outputStream, cmd, args);
|
|
||||||
} catch (JedisConnectionException e) {
|
|
||||||
disconnect();
|
|
||||||
throw new JedisConnectionException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Connection sendCommand(final Command cmd, final String... args) {
|
protected Connection sendCommand(final Command cmd, final String... args) {
|
||||||
final byte[][] bargs = new byte[args.length][];
|
final byte[][] bargs = new byte[args.length][];
|
||||||
for (int i = 0; i < args.length; i++) {
|
for (int i = 0; i < args.length; i++) {
|
||||||
@@ -89,14 +71,14 @@ public class Connection {
|
|||||||
|
|
||||||
protected Connection sendCommand(final Command cmd, final byte[]... args) {
|
protected Connection sendCommand(final Command cmd, final byte[]... args) {
|
||||||
connect();
|
connect();
|
||||||
sendProtocolCommand(cmd, args);
|
protocol.sendCommand(outputStream, cmd, args);
|
||||||
pipelinedCommands++;
|
pipelinedCommands++;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Connection sendCommand(final Command cmd) {
|
protected Connection sendCommand(final Command cmd) {
|
||||||
connect();
|
connect();
|
||||||
sendProtocolCommand(cmd, new byte[0][]);
|
protocol.sendCommand(outputStream, cmd, new byte[0][]);
|
||||||
pipelinedCommands++;
|
pipelinedCommands++;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@@ -163,7 +145,7 @@ public class Connection {
|
|||||||
protected String getStatusCodeReply() {
|
protected String getStatusCodeReply() {
|
||||||
flush();
|
flush();
|
||||||
pipelinedCommands--;
|
pipelinedCommands--;
|
||||||
final byte[] resp = (byte[]) read();
|
final byte[] resp = (byte[]) protocol.read(inputStream);
|
||||||
if (null == resp) {
|
if (null == resp) {
|
||||||
return null;
|
return null;
|
||||||
} else {
|
} else {
|
||||||
@@ -183,13 +165,13 @@ public class Connection {
|
|||||||
public byte[] getBinaryBulkReply() {
|
public byte[] getBinaryBulkReply() {
|
||||||
flush();
|
flush();
|
||||||
pipelinedCommands--;
|
pipelinedCommands--;
|
||||||
return (byte[]) read();
|
return (byte[]) protocol.read(inputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Long getIntegerReply() {
|
public Long getIntegerReply() {
|
||||||
flush();
|
flush();
|
||||||
pipelinedCommands--;
|
pipelinedCommands--;
|
||||||
return (Long) read();
|
return (Long) protocol.read(inputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> getMultiBulkReply() {
|
public List<String> getMultiBulkReply() {
|
||||||
@@ -200,14 +182,14 @@ public class Connection {
|
|||||||
public List<byte[]> getBinaryMultiBulkReply() {
|
public List<byte[]> getBinaryMultiBulkReply() {
|
||||||
flush();
|
flush();
|
||||||
pipelinedCommands--;
|
pipelinedCommands--;
|
||||||
return (List<byte[]>) read();
|
return (List<byte[]>) protocol.read(inputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public List<Object> getObjectMultiBulkReply() {
|
public List<Object> getObjectMultiBulkReply() {
|
||||||
flush();
|
flush();
|
||||||
pipelinedCommands--;
|
pipelinedCommands--;
|
||||||
return (List<Object>) read();
|
return (List<Object>) protocol.read(inputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Object> getAll() {
|
public List<Object> getAll() {
|
||||||
@@ -218,7 +200,7 @@ public class Connection {
|
|||||||
List<Object> all = new ArrayList<Object>();
|
List<Object> all = new ArrayList<Object>();
|
||||||
flush();
|
flush();
|
||||||
while (pipelinedCommands > except) {
|
while (pipelinedCommands > except) {
|
||||||
all.add(read());
|
all.add(protocol.read(inputStream));
|
||||||
pipelinedCommands--;
|
pipelinedCommands--;
|
||||||
}
|
}
|
||||||
return all;
|
return all;
|
||||||
@@ -227,6 +209,6 @@ public class Connection {
|
|||||||
public Object getOne() {
|
public Object getOne() {
|
||||||
flush();
|
flush();
|
||||||
pipelinedCommands--;
|
pipelinedCommands--;
|
||||||
return read();
|
return protocol.read(inputStream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -27,6 +27,7 @@ public class JedisShardInfo extends ShardInfo<Jedis> {
|
|||||||
private String host;
|
private String host;
|
||||||
private int port;
|
private int port;
|
||||||
private String password = null;
|
private String password = null;
|
||||||
|
private String name = null;
|
||||||
|
|
||||||
public String getHost() {
|
public String getHost() {
|
||||||
return host;
|
return host;
|
||||||
@@ -39,15 +40,27 @@ public class JedisShardInfo extends ShardInfo<Jedis> {
|
|||||||
public JedisShardInfo(String host) {
|
public JedisShardInfo(String host) {
|
||||||
this(host, Protocol.DEFAULT_PORT);
|
this(host, Protocol.DEFAULT_PORT);
|
||||||
}
|
}
|
||||||
|
public JedisShardInfo(String host, String name) {
|
||||||
|
this(host, Protocol.DEFAULT_PORT, name);
|
||||||
|
}
|
||||||
|
|
||||||
public JedisShardInfo(String host, int port) {
|
public JedisShardInfo(String host, int port) {
|
||||||
this(host, port, 2000);
|
this(host, port, 2000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public JedisShardInfo(String host, int port, String name) {
|
||||||
|
this(host, port, 2000, name);
|
||||||
|
}
|
||||||
|
|
||||||
public JedisShardInfo(String host, int port, int timeout) {
|
public JedisShardInfo(String host, int port, int timeout) {
|
||||||
this(host, port, timeout, Sharded.DEFAULT_WEIGHT);
|
this(host, port, timeout, Sharded.DEFAULT_WEIGHT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public JedisShardInfo(String host, int port, int timeout, String name) {
|
||||||
|
this(host, port, timeout, Sharded.DEFAULT_WEIGHT);
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
public JedisShardInfo(String host, int port, int timeout, int weight) {
|
public JedisShardInfo(String host, int port, int timeout, int weight) {
|
||||||
super(weight);
|
super(weight);
|
||||||
this.host = host;
|
this.host = host;
|
||||||
@@ -71,6 +84,10 @@ public class JedisShardInfo extends ShardInfo<Jedis> {
|
|||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name ;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Jedis createResource() {
|
public Jedis createResource() {
|
||||||
return new Jedis(this);
|
return new Jedis(this);
|
||||||
|
|||||||
@@ -15,4 +15,6 @@ public abstract class ShardInfo<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected abstract T createResource();
|
protected abstract T createResource();
|
||||||
|
|
||||||
|
public abstract String getName();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,9 +55,14 @@ public class Sharded<R, S extends ShardInfo<R>> {
|
|||||||
|
|
||||||
for (int i = 0; i != shards.size(); ++i) {
|
for (int i = 0; i != shards.size(); ++i) {
|
||||||
final S shardInfo = shards.get(i);
|
final S shardInfo = shards.get(i);
|
||||||
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
|
if (shardInfo.getName() == null)
|
||||||
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
|
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
|
||||||
}
|
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
|
||||||
|
nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo);
|
||||||
|
}
|
||||||
resources.put(shardInfo, shardInfo.createResource());
|
resources.put(shardInfo, shardInfo.createResource());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -115,4 +115,72 @@ public class ShardedJedisPoolTest extends Assert {
|
|||||||
|
|
||||||
assertNotSame(j1.getShard("foo"), j2.getShard("foo"));
|
assertNotSame(j1.getShard("foo"), j2.getShard("foo"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void checkConnectionsWithNoServers() {
|
||||||
|
shards = new ArrayList<JedisShardInfo>();
|
||||||
|
shards.add(new JedisShardInfo("localhost", 6379, "ssa"));
|
||||||
|
shards.add(new JedisShardInfo("localhost", 6380, "ssa"));
|
||||||
|
Config redisConfig = new Config();
|
||||||
|
redisConfig.testOnBorrow = false; // deactivated for now
|
||||||
|
redisConfig.testOnReturn = true;
|
||||||
|
redisConfig.maxActive = 200; // nro threads + margen de seguridad?
|
||||||
|
redisConfig.minIdle = 200;
|
||||||
|
ShardedJedisPool pool = new ShardedJedisPool(redisConfig, shards);
|
||||||
|
ShardedJedis jedis = pool.getResource();
|
||||||
|
pool.returnResource(jedis);
|
||||||
|
pool.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void checkFailedJedisServer() {
|
||||||
|
ShardedJedisPool pool = new ShardedJedisPool(new Config(), shards);
|
||||||
|
ShardedJedis jedis = pool.getResource();
|
||||||
|
jedis.incr("foo");
|
||||||
|
pool.returnResource(jedis);
|
||||||
|
pool.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldReturnActiveShardsWhenOneGoesOffline() {
|
||||||
|
Config redisConfig = new Config();
|
||||||
|
redisConfig.testOnBorrow = false;
|
||||||
|
ShardedJedisPool pool = new ShardedJedisPool(redisConfig, shards);
|
||||||
|
ShardedJedis jedis = pool.getResource();
|
||||||
|
// fill the shards
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
jedis.set("a-test-" + i, "0");
|
||||||
|
}
|
||||||
|
pool.returnResource(jedis);
|
||||||
|
// check quantity for each shard
|
||||||
|
Jedis j = new Jedis(shards.get(0));
|
||||||
|
j.connect();
|
||||||
|
Long c1 = j.dbSize();
|
||||||
|
j.disconnect();
|
||||||
|
j = new Jedis(shards.get(1));
|
||||||
|
j.connect();
|
||||||
|
Long c2 = j.dbSize();
|
||||||
|
j.disconnect();
|
||||||
|
// shutdown shard 2 and check thay the pool returns an instance with c1
|
||||||
|
// items on one shard
|
||||||
|
// alter shard 1 and recreate pool
|
||||||
|
pool.destroy();
|
||||||
|
shards.set(1, new JedisShardInfo("nohost", 1234));
|
||||||
|
pool = new ShardedJedisPool(redisConfig, shards);
|
||||||
|
jedis = pool.getResource();
|
||||||
|
Long actual = new Long(0);
|
||||||
|
Long fails = new Long(0);
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
try {
|
||||||
|
jedis.get("a-test-" + i);
|
||||||
|
actual++;
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
fails++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pool.returnResource(jedis);
|
||||||
|
pool.destroy();
|
||||||
|
assertEquals(actual, c1);
|
||||||
|
assertEquals(fails, c2);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -240,5 +240,63 @@ public class ShardedJedisTest extends Assert {
|
|||||||
assertTrue(shard_6380 > 300 && shard_6380 < 400);
|
assertTrue(shard_6380 > 300 && shard_6380 < 400);
|
||||||
assertTrue(shard_6381 > 300 && shard_6381 < 400);
|
assertTrue(shard_6381 > 300 && shard_6381 < 400);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMasterSlaveShardingConsistency() {
|
||||||
|
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(3);
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT));
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1));
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2));
|
||||||
|
Sharded<Jedis, JedisShardInfo> sharded = new Sharded<Jedis, JedisShardInfo>(
|
||||||
|
shards, Hashing.MURMUR_HASH);
|
||||||
|
|
||||||
|
List<JedisShardInfo> otherShards = new ArrayList<JedisShardInfo>(3);
|
||||||
|
otherShards.add(new JedisShardInfo("otherhost", Protocol.DEFAULT_PORT));
|
||||||
|
otherShards.add(new JedisShardInfo("otherhost",
|
||||||
|
Protocol.DEFAULT_PORT + 1));
|
||||||
|
otherShards.add(new JedisShardInfo("otherhost",
|
||||||
|
Protocol.DEFAULT_PORT + 2));
|
||||||
|
Sharded<Jedis, JedisShardInfo> sharded2 = new Sharded<Jedis, JedisShardInfo>(
|
||||||
|
otherShards, Hashing.MURMUR_HASH);
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer
|
||||||
|
.toString(i));
|
||||||
|
JedisShardInfo jedisShardInfo2 = sharded2.getShardInfo(Integer
|
||||||
|
.toString(i));
|
||||||
|
assertEquals(shards.indexOf(jedisShardInfo),
|
||||||
|
otherShards.indexOf(jedisShardInfo2));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testMasterSlaveShardingConsistencyWithShardNaming() {
|
||||||
|
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(3);
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT, "HOST1:1234"));
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1,"HOST2:1234"));
|
||||||
|
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2, "HOST3:1234"));
|
||||||
|
Sharded<Jedis, JedisShardInfo> sharded = new Sharded<Jedis, JedisShardInfo>(
|
||||||
|
shards, Hashing.MURMUR_HASH);
|
||||||
|
|
||||||
|
List<JedisShardInfo> otherShards = new ArrayList<JedisShardInfo>(3);
|
||||||
|
otherShards.add(new JedisShardInfo("otherhost", Protocol.DEFAULT_PORT, "HOST2:1234"));
|
||||||
|
otherShards.add(new JedisShardInfo("otherhost",
|
||||||
|
Protocol.DEFAULT_PORT + 1, "HOST3:1234"));
|
||||||
|
otherShards.add(new JedisShardInfo("otherhost",
|
||||||
|
Protocol.DEFAULT_PORT + 2, "HOST1:1234"));
|
||||||
|
Sharded<Jedis, JedisShardInfo> sharded2 = new Sharded<Jedis, JedisShardInfo>(
|
||||||
|
otherShards, Hashing.MURMUR_HASH);
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer
|
||||||
|
.toString(i));
|
||||||
|
JedisShardInfo jedisShardInfo2 = sharded2.getShardInfo(Integer
|
||||||
|
.toString(i));
|
||||||
|
assertEquals(jedisShardInfo.getName(),
|
||||||
|
jedisShardInfo2.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user