Now Sharded will handle connections instead of Info, so connection won't be shared between instances of ShardedJedis
This commit is contained in:
@@ -31,8 +31,8 @@ public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo>
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void disconnect() throws IOException {
|
public void disconnect() throws IOException {
|
||||||
for (JedisShardInfo jedis : getAllShards()) {
|
for (Jedis jedis : getAllShards()) {
|
||||||
jedis.getResource().disconnect();
|
jedis.disconnect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -29,9 +29,9 @@ public class ShardedJedis extends BinaryShardedJedis implements JedisCommands {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void disconnect() throws IOException {
|
public void disconnect() throws IOException {
|
||||||
for (JedisShardInfo jedis : getAllShards()) {
|
for (Jedis jedis : getAllShards()) {
|
||||||
jedis.getResource().quit();
|
jedis.quit();
|
||||||
jedis.getResource().disconnect();
|
jedis.disconnect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -51,9 +51,9 @@ public class ShardedJedisPool extends Pool<ShardedJedis> {
|
|||||||
boolean done = false;
|
boolean done = false;
|
||||||
while (!done) {
|
while (!done) {
|
||||||
try {
|
try {
|
||||||
for (JedisShardInfo shard : jedis.getAllShards()) {
|
for (Jedis shard : jedis.getAllShards()) {
|
||||||
if (!shard.getResource().isConnected()) {
|
if (!shard.isConnected()) {
|
||||||
shard.getResource().connect();
|
shard.connect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
done = true;
|
done = true;
|
||||||
@@ -82,9 +82,8 @@ public class ShardedJedisPool extends Pool<ShardedJedis> {
|
|||||||
public boolean validateObject(final Object obj) {
|
public boolean validateObject(final Object obj) {
|
||||||
try {
|
try {
|
||||||
ShardedJedis jedis = (ShardedJedis) obj;
|
ShardedJedis jedis = (ShardedJedis) obj;
|
||||||
for (JedisShardInfo shard : jedis.getAllShards()) {
|
for (Jedis shard : jedis.getAllShards()) {
|
||||||
if (!shard.getResource().isConnected()
|
if (!shard.isConnected() || !shard.ping().equals("PONG")) {
|
||||||
|| !shard.getResource().ping().equals("PONG")) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
package redis.clients.util;
|
package redis.clients.util;
|
||||||
|
|
||||||
public abstract class ShardInfo<T> {
|
public abstract class ShardInfo<T> {
|
||||||
private T resource;
|
|
||||||
|
|
||||||
private int weight;
|
private int weight;
|
||||||
|
|
||||||
public ShardInfo() {
|
public ShardInfo() {
|
||||||
@@ -16,13 +14,5 @@ public abstract class ShardInfo<T> {
|
|||||||
return this.weight;
|
return this.weight;
|
||||||
}
|
}
|
||||||
|
|
||||||
public T getResource() {
|
|
||||||
return resource;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void initResource () {
|
|
||||||
resource = createResource();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract T createResource();
|
protected abstract T createResource();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,8 +2,10 @@ package redis.clients.util;
|
|||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
@@ -14,6 +16,7 @@ public class Sharded<R, S extends ShardInfo<R>> {
|
|||||||
public static final int DEFAULT_WEIGHT = 1;
|
public static final int DEFAULT_WEIGHT = 1;
|
||||||
private TreeMap<Long, S> nodes;
|
private TreeMap<Long, S> nodes;
|
||||||
private final Hashing algo;
|
private final Hashing algo;
|
||||||
|
private final Map<ShardInfo<R>, R> resources = new HashMap<ShardInfo<R>, R>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default pattern used for extracting a key tag. The pattern must have
|
* The default pattern used for extracting a key tag. The pattern must have
|
||||||
@@ -63,7 +66,7 @@ public class Sharded<R, S extends ShardInfo<R>> {
|
|||||||
long floor = Long.MIN_VALUE;
|
long floor = Long.MIN_VALUE;
|
||||||
for (int i = 0; i != shards.size(); ++i) {
|
for (int i = 0; i != shards.size(); ++i) {
|
||||||
final S shardInfo = shards.get(i);
|
final S shardInfo = shards.get(i);
|
||||||
shardInfo.initResource();
|
resources.put(shardInfo, shardInfo.createResource());
|
||||||
nodes.put(floor, shardInfo);
|
nodes.put(floor, shardInfo);
|
||||||
floor += 4 * oneForthOfStep * shardInfo.getWeight(); // *4 to
|
floor += 4 * oneForthOfStep * shardInfo.getWeight(); // *4 to
|
||||||
// compensate
|
// compensate
|
||||||
@@ -72,11 +75,11 @@ public class Sharded<R, S extends ShardInfo<R>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public R getShard(byte[] key) {
|
public R getShard(byte[] key) {
|
||||||
return getShardInfo(key).getResource();
|
return resources.get(getShardInfo(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
public R getShard(String key) {
|
public R getShard(String key) {
|
||||||
return getShardInfo(key).getResource();
|
return resources.get(getShardInfo(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
private S getShardInfo(byte[] key) {
|
private S getShardInfo(byte[] key) {
|
||||||
@@ -111,7 +114,11 @@ public class Sharded<R, S extends ShardInfo<R>> {
|
|||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<S> getAllShards() {
|
public Collection<S> getAllShardInfo() {
|
||||||
return Collections.unmodifiableCollection(nodes.values());
|
return Collections.unmodifiableCollection(nodes.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Collection<R> getAllShards() {
|
||||||
|
return Collections.unmodifiableCollection(resources.values());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -103,4 +103,18 @@ public class ShardedJedisPoolTest extends Assert {
|
|||||||
ShardedJedis newJedis = pool.getResource();
|
ShardedJedis newJedis = pool.getResource();
|
||||||
newJedis.incr("foo");
|
newJedis.incr("foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotShareInstances() throws Exception {
|
||||||
|
Config config = new Config();
|
||||||
|
config.maxActive = 2;
|
||||||
|
config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
|
||||||
|
|
||||||
|
ShardedJedisPool pool = new ShardedJedisPool(config, shards);
|
||||||
|
|
||||||
|
ShardedJedis j1 = pool.getResource();
|
||||||
|
ShardedJedis j2 = pool.getResource();
|
||||||
|
|
||||||
|
assertNotSame(j1.getShard("foo"), j2.getShard("foo"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -7,6 +7,7 @@ import java.util.Calendar;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import redis.clients.jedis.Jedis;
|
||||||
import redis.clients.jedis.JedisShardInfo;
|
import redis.clients.jedis.JedisShardInfo;
|
||||||
import redis.clients.jedis.ShardedJedis;
|
import redis.clients.jedis.ShardedJedis;
|
||||||
import redis.clients.jedis.tests.HostAndPortUtil;
|
import redis.clients.jedis.tests.HostAndPortUtil;
|
||||||
@@ -27,9 +28,9 @@ public class ShardedBenchmark {
|
|||||||
shard.setPassword("foobared");
|
shard.setPassword("foobared");
|
||||||
shards.add(shard);
|
shards.add(shard);
|
||||||
ShardedJedis jedis = new ShardedJedis(shards);
|
ShardedJedis jedis = new ShardedJedis(shards);
|
||||||
Collection<JedisShardInfo> allShards = jedis.getAllShards();
|
Collection<Jedis> allShards = jedis.getAllShards();
|
||||||
for (JedisShardInfo jedisShardInfo : allShards) {
|
for (Jedis j : allShards) {
|
||||||
jedisShardInfo.getResource().flushAll();
|
j.flushAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
long begin = Calendar.getInstance().getTimeInMillis();
|
long begin = Calendar.getInstance().getTimeInMillis();
|
||||||
|
|||||||
Reference in New Issue
Block a user