From d70c42edb9c1cdf810dd4ad653745d8ce5fec56c Mon Sep 17 00:00:00 2001 From: Jonathan Leibiusky Date: Tue, 2 Nov 2010 01:04:02 -0300 Subject: [PATCH] Added sharding+pipelining support --- .../java/redis/clients/jedis/Connection.java | 149 +++--- src/main/java/redis/clients/jedis/Jedis.java | 4 + .../redis/clients/jedis/ShardedJedis.java | 296 ++++++------ .../clients/jedis/ShardedJedisPipeline.java | 437 ++++++++++++++++++ .../clients/jedis/tests/ShardedJedisTest.java | 178 ++++--- 5 files changed, 771 insertions(+), 293 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/ShardedJedisPipeline.java diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index c0191ec..778b359 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -1,15 +1,15 @@ package redis.clients.jedis; -import redis.clients.util.RedisInputStream; -import redis.clients.util.RedisOutputStream; - -import java.io.*; +import java.io.IOException; import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; +import redis.clients.util.RedisInputStream; +import redis.clients.util.RedisOutputStream; + public class Connection { private String host; private int port = Protocol.DEFAULT_PORT; @@ -21,134 +21,139 @@ public class Connection { private int timeout = 2000; public int getTimeout() { - return timeout; + return timeout; } public void setTimeout(int timeout) { - this.timeout = timeout; + this.timeout = timeout; } public void setTimeoutInfinite() { - try { - socket.setSoTimeout(0); - } catch (SocketException ex) { - throw new JedisException(ex); - } + try { + socket.setSoTimeout(0); + } catch (SocketException ex) { + throw new JedisException(ex); + } } public void rollbackTimeout() { - try { - socket.setSoTimeout(timeout); - } catch (SocketException ex) { - throw new JedisException(ex); - } + try { + socket.setSoTimeout(timeout); + } catch (SocketException ex) { + throw new JedisException(ex); + } } public Connection(String host) { - super(); - this.host = host; + super(); + this.host = host; } protected Connection sendCommand(String name, String... args) { - try { - connect(); - } catch (UnknownHostException e) { - throw new JedisException("Could not connect to redis-server", e); - } catch (IOException e) { - throw new JedisException("Could not connect to redis-server", e); - } - protocol.sendCommand(outputStream, name, args); - pipelinedCommands++; - return this; + try { + connect(); + } catch (UnknownHostException e) { + throw new JedisException("Could not connect to redis-server", e); + } catch (IOException e) { + throw new JedisException("Could not connect to redis-server", e); + } + protocol.sendCommand(outputStream, name, args); + pipelinedCommands++; + return this; } public Connection(String host, int port) { - super(); - this.host = host; - this.port = port; + super(); + this.host = host; + this.port = port; } public String getHost() { - return host; + return host; } public void setHost(String host) { - this.host = host; + this.host = host; } public int getPort() { - return port; + return port; } public void setPort(int port) { - this.port = port; + this.port = port; } public Connection() { } public void connect() throws UnknownHostException, IOException { - if (!isConnected()) { - socket = new Socket(host, port); - socket.setSoTimeout(timeout); - outputStream = new RedisOutputStream(socket.getOutputStream()); - inputStream = new RedisInputStream(socket.getInputStream()); - } + if (!isConnected()) { + socket = new Socket(host, port); + socket.setSoTimeout(timeout); + outputStream = new RedisOutputStream(socket.getOutputStream()); + inputStream = new RedisInputStream(socket.getInputStream()); + } } public void disconnect() { - if (isConnected()) { - try { - inputStream.close(); - outputStream.close(); - if (!socket.isClosed()) { - socket.close(); - } - } catch (IOException ex) { - throw new JedisException(ex); - } - } + if (isConnected()) { + try { + inputStream.close(); + outputStream.close(); + if (!socket.isClosed()) { + socket.close(); + } + } catch (IOException ex) { + throw new JedisException(ex); + } + } } public boolean isConnected() { - return socket != null && socket.isBound() && !socket.isClosed() - && socket.isConnected() && !socket.isInputShutdown() - && !socket.isOutputShutdown(); + return socket != null && socket.isBound() && !socket.isClosed() + && socket.isConnected() && !socket.isInputShutdown() + && !socket.isOutputShutdown(); } protected String getStatusCodeReply() { - pipelinedCommands--; - return (String) protocol.read(inputStream); + pipelinedCommands--; + return (String) protocol.read(inputStream); } public String getBulkReply() { - pipelinedCommands--; - return (String) protocol.read(inputStream); + pipelinedCommands--; + return (String) protocol.read(inputStream); } public int getIntegerReply() { - pipelinedCommands--; - return ((Integer) protocol.read(inputStream)).intValue(); + pipelinedCommands--; + return ((Integer) protocol.read(inputStream)).intValue(); } @SuppressWarnings("unchecked") public List getMultiBulkReply() { - pipelinedCommands--; - return (List) protocol.read(inputStream); + pipelinedCommands--; + return (List) protocol.read(inputStream); } @SuppressWarnings("unchecked") public List getObjectMultiBulkReply() { - pipelinedCommands--; - return (List) protocol.read(inputStream); + pipelinedCommands--; + return (List) protocol.read(inputStream); } public List getAll() { - List all = new ArrayList(); - while (pipelinedCommands > 0) { - all.add(protocol.read(inputStream)); - pipelinedCommands--; - } - return all; + List all = new ArrayList(); + while (pipelinedCommands > 0) { + all.add(protocol.read(inputStream)); + pipelinedCommands--; + } + return all; } -} + + public Object getOne() { + pipelinedCommands--; + return protocol.read(inputStream); + } +} \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index f75b11d..6feaf31 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -2849,4 +2849,8 @@ public class Jedis implements JedisCommands { client.debug(params); return client.getStatusCodeReply(); } + + public Client getClient() { + return client; + } } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index 935210e..a56f7dc 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -10,365 +10,371 @@ import redis.clients.jedis.Client.LIST_POSITION; import redis.clients.util.Hashing; import redis.clients.util.Sharded; -public class ShardedJedis extends Sharded implements JedisCommands { +public class ShardedJedis extends Sharded implements + JedisCommands { public ShardedJedis(List shards) { - super(shards); + super(shards); } public ShardedJedis(List shards, Hashing algo) { - super(shards, algo); + super(shards, algo); } public ShardedJedis(List shards, Pattern keyTagPattern) { - super(shards, keyTagPattern); + super(shards, keyTagPattern); } public ShardedJedis(List shards, Hashing algo, - Pattern keyTagPattern) { - super(shards, algo, keyTagPattern); + Pattern keyTagPattern) { + super(shards, algo, keyTagPattern); } public void disconnect() throws IOException { - for (JedisShardInfo jedis : getAllShards()) { - jedis.getResource().disconnect(); - } + for (JedisShardInfo jedis : getAllShards()) { + jedis.getResource().disconnect(); + } } protected Jedis create(JedisShardInfo shard) { - return new Jedis(shard); + return new Jedis(shard); } public String set(String key, String value) { - Jedis j = getShard(key); - return j.set(key, value); + Jedis j = getShard(key); + return j.set(key, value); } public String get(String key) { - Jedis j = getShard(key); - return j.get(key); + Jedis j = getShard(key); + return j.get(key); } public Integer exists(String key) { - Jedis j = getShard(key); - return j.exists(key); + Jedis j = getShard(key); + return j.exists(key); } public String type(String key) { - Jedis j = getShard(key); - return j.type(key); + Jedis j = getShard(key); + return j.type(key); } public Integer expire(String key, int seconds) { - Jedis j = getShard(key); - return j.expire(key, seconds); + Jedis j = getShard(key); + return j.expire(key, seconds); } public Integer expireAt(String key, long unixTime) { - Jedis j = getShard(key); - return j.expireAt(key, unixTime); + Jedis j = getShard(key); + return j.expireAt(key, unixTime); } public Integer ttl(String key) { - Jedis j = getShard(key); - return j.ttl(key); + Jedis j = getShard(key); + return j.ttl(key); } public String getSet(String key, String value) { - Jedis j = getShard(key); - return j.getSet(key, value); + Jedis j = getShard(key); + return j.getSet(key, value); } public Integer setnx(String key, String value) { - Jedis j = getShard(key); - return j.setnx(key, value); + Jedis j = getShard(key); + return j.setnx(key, value); } public String setex(String key, int seconds, String value) { - Jedis j = getShard(key); - return j.setex(key, seconds, value); + Jedis j = getShard(key); + return j.setex(key, seconds, value); } public Integer decrBy(String key, int integer) { - Jedis j = getShard(key); - return j.decrBy(key, integer); + Jedis j = getShard(key); + return j.decrBy(key, integer); } public Integer decr(String key) { - Jedis j = getShard(key); - return j.decr(key); + Jedis j = getShard(key); + return j.decr(key); } public Integer incrBy(String key, int integer) { - Jedis j = getShard(key); - return j.incrBy(key, integer); + Jedis j = getShard(key); + return j.incrBy(key, integer); } public Integer incr(String key) { - Jedis j = getShard(key); - return j.incr(key); + Jedis j = getShard(key); + return j.incr(key); } public Integer append(String key, String value) { - Jedis j = getShard(key); - return j.append(key, value); + Jedis j = getShard(key); + return j.append(key, value); } public String substr(String key, int start, int end) { - Jedis j = getShard(key); - return j.substr(key, start, end); + Jedis j = getShard(key); + return j.substr(key, start, end); } public Integer hset(String key, String field, String value) { - Jedis j = getShard(key); - return j.hset(key, field, value); + Jedis j = getShard(key); + return j.hset(key, field, value); } public String hget(String key, String field) { - Jedis j = getShard(key); - return j.hget(key, field); + Jedis j = getShard(key); + return j.hget(key, field); } public Integer hsetnx(String key, String field, String value) { - Jedis j = getShard(key); - return j.hsetnx(key, field, value); + Jedis j = getShard(key); + return j.hsetnx(key, field, value); } public String hmset(String key, Map hash) { - Jedis j = getShard(key); - return j.hmset(key, hash); + Jedis j = getShard(key); + return j.hmset(key, hash); } public List hmget(String key, String... fields) { - Jedis j = getShard(key); - return j.hmget(key, fields); + Jedis j = getShard(key); + return j.hmget(key, fields); } public Integer hincrBy(String key, String field, int value) { - Jedis j = getShard(key); - return j.hincrBy(key, field, value); + Jedis j = getShard(key); + return j.hincrBy(key, field, value); } public Integer hexists(String key, String field) { - Jedis j = getShard(key); - return j.hexists(key, field); + Jedis j = getShard(key); + return j.hexists(key, field); } public Integer hdel(String key, String field) { - Jedis j = getShard(key); - return j.hdel(key, field); + Jedis j = getShard(key); + return j.hdel(key, field); } public Integer hlen(String key) { - Jedis j = getShard(key); - return j.hlen(key); + Jedis j = getShard(key); + return j.hlen(key); } public List hkeys(String key) { - Jedis j = getShard(key); - return j.hkeys(key); + Jedis j = getShard(key); + return j.hkeys(key); } public List hvals(String key) { - Jedis j = getShard(key); - return j.hvals(key); + Jedis j = getShard(key); + return j.hvals(key); } public Map hgetAll(String key) { - Jedis j = getShard(key); - return j.hgetAll(key); + Jedis j = getShard(key); + return j.hgetAll(key); } public Integer rpush(String key, String string) { - Jedis j = getShard(key); - return j.rpush(key, string); + Jedis j = getShard(key); + return j.rpush(key, string); } public Integer lpush(String key, String string) { - Jedis j = getShard(key); - return j.lpush(key, string); + Jedis j = getShard(key); + return j.lpush(key, string); } public Integer llen(String key) { - Jedis j = getShard(key); - return j.llen(key); + Jedis j = getShard(key); + return j.llen(key); } public List lrange(String key, int start, int end) { - Jedis j = getShard(key); - return j.lrange(key, start, end); + Jedis j = getShard(key); + return j.lrange(key, start, end); } public String ltrim(String key, int start, int end) { - Jedis j = getShard(key); - return j.ltrim(key, start, end); + Jedis j = getShard(key); + return j.ltrim(key, start, end); } public String lindex(String key, int index) { - Jedis j = getShard(key); - return j.lindex(key, index); + Jedis j = getShard(key); + return j.lindex(key, index); } public String lset(String key, int index, String value) { - Jedis j = getShard(key); - return j.lset(key, index, value); + Jedis j = getShard(key); + return j.lset(key, index, value); } public Integer lrem(String key, int count, String value) { - Jedis j = getShard(key); - return j.lrem(key, count, value); + Jedis j = getShard(key); + return j.lrem(key, count, value); } public String lpop(String key) { - Jedis j = getShard(key); - return j.lpop(key); + Jedis j = getShard(key); + return j.lpop(key); } public String rpop(String key) { - Jedis j = getShard(key); - return j.rpop(key); + Jedis j = getShard(key); + return j.rpop(key); } public Integer sadd(String key, String member) { - Jedis j = getShard(key); - return j.sadd(key, member); + Jedis j = getShard(key); + return j.sadd(key, member); } public Set smembers(String key) { - Jedis j = getShard(key); - return j.smembers(key); + Jedis j = getShard(key); + return j.smembers(key); } public Integer srem(String key, String member) { - Jedis j = getShard(key); - return j.srem(key, member); + Jedis j = getShard(key); + return j.srem(key, member); } public String spop(String key) { - Jedis j = getShard(key); - return j.spop(key); + Jedis j = getShard(key); + return j.spop(key); } public Integer scard(String key) { - Jedis j = getShard(key); - return j.scard(key); + Jedis j = getShard(key); + return j.scard(key); } public Integer sismember(String key, String member) { - Jedis j = getShard(key); - return j.sismember(key, member); + Jedis j = getShard(key); + return j.sismember(key, member); } public String srandmember(String key) { - Jedis j = getShard(key); - return j.srandmember(key); + Jedis j = getShard(key); + return j.srandmember(key); } public Integer zadd(String key, double score, String member) { - Jedis j = getShard(key); - return j.zadd(key, score, member); + Jedis j = getShard(key); + return j.zadd(key, score, member); } public Set zrange(String key, int start, int end) { - Jedis j = getShard(key); - return j.zrange(key, start, end); + Jedis j = getShard(key); + return j.zrange(key, start, end); } public Integer zrem(String key, String member) { - Jedis j = getShard(key); - return j.zrem(key, member); + Jedis j = getShard(key); + return j.zrem(key, member); } public Double zincrby(String key, double score, String member) { - Jedis j = getShard(key); - return j.zincrby(key, score, member); + Jedis j = getShard(key); + return j.zincrby(key, score, member); } public Integer zrank(String key, String member) { - Jedis j = getShard(key); - return j.zrank(key, member); + Jedis j = getShard(key); + return j.zrank(key, member); } public Integer zrevrank(String key, String member) { - Jedis j = getShard(key); - return j.zrevrank(key, member); + Jedis j = getShard(key); + return j.zrevrank(key, member); } public Set zrevrange(String key, int start, int end) { - Jedis j = getShard(key); - return j.zrevrange(key, start, end); + Jedis j = getShard(key); + return j.zrevrange(key, start, end); } public Set zrangeWithScores(String key, int start, int end) { - Jedis j = getShard(key); - return j.zrangeWithScores(key, start, end); + Jedis j = getShard(key); + return j.zrangeWithScores(key, start, end); } public Set zrevrangeWithScores(String key, int start, int end) { - Jedis j = getShard(key); - return j.zrevrangeWithScores(key, start, end); + Jedis j = getShard(key); + return j.zrevrangeWithScores(key, start, end); } public Integer zcard(String key) { - Jedis j = getShard(key); - return j.zcard(key); + Jedis j = getShard(key); + return j.zcard(key); } public Double zscore(String key, String member) { - Jedis j = getShard(key); - return j.zscore(key, member); + Jedis j = getShard(key); + return j.zscore(key, member); } public List sort(String key) { - Jedis j = getShard(key); - return j.sort(key); + Jedis j = getShard(key); + return j.sort(key); } public List sort(String key, SortingParams sortingParameters) { - Jedis j = getShard(key); - return j.sort(key, sortingParameters); + Jedis j = getShard(key); + return j.sort(key, sortingParameters); } public Integer zcount(String key, double min, double max) { - Jedis j = getShard(key); - return j.zcount(key, min, max); + Jedis j = getShard(key); + return j.zcount(key, min, max); } public Set zrangeByScore(String key, double min, double max) { - Jedis j = getShard(key); - return j.zrangeByScore(key, min, max); + Jedis j = getShard(key); + return j.zrangeByScore(key, min, max); } public Set zrangeByScore(String key, double min, double max, - int offset, int count) { - Jedis j = getShard(key); - return j.zrangeByScore(key, min, max, offset, count); + int offset, int count) { + Jedis j = getShard(key); + return j.zrangeByScore(key, min, max, offset, count); } public Set zrangeByScoreWithScores(String key, double min, double max) { - Jedis j = getShard(key); - return j.zrangeByScoreWithScores(key, min, max); + Jedis j = getShard(key); + return j.zrangeByScoreWithScores(key, min, max); } public Set zrangeByScoreWithScores(String key, double min, - double max, int offset, int count) { - Jedis j = getShard(key); - return j.zrangeByScoreWithScores(key, min, max, offset, count); + double max, int offset, int count) { + Jedis j = getShard(key); + return j.zrangeByScoreWithScores(key, min, max, offset, count); } public Integer zremrangeByRank(String key, int start, int end) { - Jedis j = getShard(key); - return j.zremrangeByRank(key, start, end); + Jedis j = getShard(key); + return j.zremrangeByRank(key, start, end); } public Integer zremrangeByScore(String key, double start, double end) { - Jedis j = getShard(key); - return j.zremrangeByScore(key, start, end); + Jedis j = getShard(key); + return j.zremrangeByScore(key, start, end); } public Integer linsert(String key, LIST_POSITION where, String pivot, - String value) { - Jedis j = getShard(key); - return j.linsert(key, where, pivot, value); + String value) { + Jedis j = getShard(key); + return j.linsert(key, where, pivot, value); } + public List pipelined(ShardedJedisPipeline shardedJedisPipeline) { + shardedJedisPipeline.setShardedJedis(this); + shardedJedisPipeline.execute(); + return shardedJedisPipeline.getResults(); + } } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/ShardedJedisPipeline.java b/src/main/java/redis/clients/jedis/ShardedJedisPipeline.java new file mode 100644 index 0000000..c4e4881 --- /dev/null +++ b/src/main/java/redis/clients/jedis/ShardedJedisPipeline.java @@ -0,0 +1,437 @@ +package redis.clients.jedis; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import redis.clients.jedis.Client.LIST_POSITION; + +public abstract class ShardedJedisPipeline { + private ShardedJedis jedis; + private List results = new ArrayList(); + + private class FutureResult { + private Client client; + + public FutureResult(Client client) { + this.client = client; + } + + public Object get() { + return client.getOne(); + } + } + + public void setShardedJedis(ShardedJedis jedis) { + this.jedis = jedis; + } + + protected void set(String key, String value) { + Client c = jedis.getShard(key).getClient(); + c.set(key, value); + results.add(new FutureResult(c)); + } + + protected void get(String key) { + Client c = jedis.getShard(key).getClient(); + c.get(key); + results.add(new FutureResult(c)); + } + + protected void exists(String key) { + Client c = jedis.getShard(key).getClient(); + c.exists(key); + results.add(new FutureResult(c)); + } + + protected void type(String key) { + Client c = jedis.getShard(key).getClient(); + c.type(key); + results.add(new FutureResult(c)); + } + + protected void expire(String key, int seconds) { + Client c = jedis.getShard(key).getClient(); + c.expire(key, seconds); + results.add(new FutureResult(c)); + } + + protected void expireAt(String key, long unixTime) { + Client c = jedis.getShard(key).getClient(); + c.expireAt(key, unixTime); + results.add(new FutureResult(c)); + } + + protected void ttl(String key) { + Client c = jedis.getShard(key).getClient(); + c.ttl(key); + results.add(new FutureResult(c)); + } + + protected void getSet(String key, String value) { + Client c = jedis.getShard(key).getClient(); + c.getSet(key, value); + results.add(new FutureResult(c)); + } + + protected void setnx(String key, String value) { + Client c = jedis.getShard(key).getClient(); + c.setnx(key, value); + results.add(new FutureResult(c)); + } + + protected void setex(String key, int seconds, String value) { + Client c = jedis.getShard(key).getClient(); + c.setex(key, seconds, value); + results.add(new FutureResult(c)); + } + + protected void decrBy(String key, int integer) { + Client c = jedis.getShard(key).getClient(); + c.decrBy(key, integer); + results.add(new FutureResult(c)); + } + + protected void decr(String key) { + Client c = jedis.getShard(key).getClient(); + c.decr(key); + results.add(new FutureResult(c)); + } + + protected void incrBy(String key, int integer) { + Client c = jedis.getShard(key).getClient(); + c.incrBy(key, integer); + results.add(new FutureResult(c)); + } + + protected void incr(String key) { + Client c = jedis.getShard(key).getClient(); + c.incr(key); + results.add(new FutureResult(c)); + } + + protected void append(String key, String value) { + Client c = jedis.getShard(key).getClient(); + c.append(key, value); + results.add(new FutureResult(c)); + } + + protected void substr(String key, int start, int end) { + Client c = jedis.getShard(key).getClient(); + c.substr(key, start, end); + results.add(new FutureResult(c)); + } + + protected void hset(String key, String field, String value) { + Client c = jedis.getShard(key).getClient(); + c.hset(key, field, value); + results.add(new FutureResult(c)); + } + + protected void hget(String key, String field) { + Client c = jedis.getShard(key).getClient(); + c.hget(key, field); + results.add(new FutureResult(c)); + } + + protected void hsetnx(String key, String field, String value) { + Client c = jedis.getShard(key).getClient(); + c.hsetnx(key, field, value); + results.add(new FutureResult(c)); + } + + protected void hmset(String key, Map hash) { + Client c = jedis.getShard(key).getClient(); + c.hmset(key, hash); + results.add(new FutureResult(c)); + } + + protected void hmget(String key, String... fields) { + Client c = jedis.getShard(key).getClient(); + c.hmget(key, fields); + results.add(new FutureResult(c)); + } + + protected void hincrBy(String key, String field, int value) { + Client c = jedis.getShard(key).getClient(); + c.hincrBy(key, field, value); + results.add(new FutureResult(c)); + } + + protected void hexists(String key, String field) { + Client c = jedis.getShard(key).getClient(); + c.hexists(key, field); + results.add(new FutureResult(c)); + } + + protected void hdel(String key, String field) { + Client c = jedis.getShard(key).getClient(); + c.hdel(key, field); + results.add(new FutureResult(c)); + } + + protected void hlen(String key) { + Client c = jedis.getShard(key).getClient(); + c.hlen(key); + results.add(new FutureResult(c)); + } + + protected void hkeys(String key) { + Client c = jedis.getShard(key).getClient(); + c.hkeys(key); + results.add(new FutureResult(c)); + } + + protected void hvals(String key) { + Client c = jedis.getShard(key).getClient(); + c.hvals(key); + results.add(new FutureResult(c)); + } + + protected void hgetAll(String key) { + Client c = jedis.getShard(key).getClient(); + c.hgetAll(key); + results.add(new FutureResult(c)); + } + + protected void rpush(String key, String string) { + Client c = jedis.getShard(key).getClient(); + c.rpush(key, string); + results.add(new FutureResult(c)); + } + + protected void lpush(String key, String string) { + Client c = jedis.getShard(key).getClient(); + c.lpush(key, string); + results.add(new FutureResult(c)); + } + + protected void llen(String key) { + Client c = jedis.getShard(key).getClient(); + c.llen(key); + results.add(new FutureResult(c)); + } + + protected void lrange(String key, int start, int end) { + Client c = jedis.getShard(key).getClient(); + c.lrange(key, start, end); + results.add(new FutureResult(c)); + } + + protected void ltrim(String key, int start, int end) { + Client c = jedis.getShard(key).getClient(); + c.ltrim(key, start, end); + results.add(new FutureResult(c)); + } + + protected void lindex(String key, int index) { + Client c = jedis.getShard(key).getClient(); + c.lindex(key, index); + results.add(new FutureResult(c)); + } + + protected void lset(String key, int index, String value) { + Client c = jedis.getShard(key).getClient(); + c.lset(key, index, value); + results.add(new FutureResult(c)); + } + + protected void lrem(String key, int count, String value) { + Client c = jedis.getShard(key).getClient(); + c.lrem(key, count, value); + results.add(new FutureResult(c)); + } + + protected void lpop(String key) { + Client c = jedis.getShard(key).getClient(); + c.lpop(key); + results.add(new FutureResult(c)); + } + + protected void rpop(String key) { + Client c = jedis.getShard(key).getClient(); + c.rpop(key); + results.add(new FutureResult(c)); + } + + protected void sadd(String key, String member) { + Client c = jedis.getShard(key).getClient(); + c.sadd(key, member); + results.add(new FutureResult(c)); + } + + protected void smembers(String key) { + Client c = jedis.getShard(key).getClient(); + c.smembers(key); + results.add(new FutureResult(c)); + } + + protected void srem(String key, String member) { + Client c = jedis.getShard(key).getClient(); + c.srem(key, member); + results.add(new FutureResult(c)); + } + + protected void spop(String key) { + Client c = jedis.getShard(key).getClient(); + c.spop(key); + results.add(new FutureResult(c)); + } + + protected void scard(String key) { + Client c = jedis.getShard(key).getClient(); + c.scard(key); + results.add(new FutureResult(c)); + } + + protected void sismember(String key, String member) { + Client c = jedis.getShard(key).getClient(); + c.sismember(key, member); + results.add(new FutureResult(c)); + } + + protected void srandmember(String key) { + Client c = jedis.getShard(key).getClient(); + c.srandmember(key); + results.add(new FutureResult(c)); + } + + protected void zadd(String key, double score, String member) { + Client c = jedis.getShard(key).getClient(); + c.zadd(key, score, member); + results.add(new FutureResult(c)); + } + + protected void zrange(String key, int start, int end) { + Client c = jedis.getShard(key).getClient(); + c.zrange(key, start, end); + results.add(new FutureResult(c)); + } + + protected void zrem(String key, String member) { + Client c = jedis.getShard(key).getClient(); + c.zrem(key, member); + results.add(new FutureResult(c)); + } + + protected void zincrby(String key, double score, String member) { + Client c = jedis.getShard(key).getClient(); + c.zincrby(key, score, member); + results.add(new FutureResult(c)); + } + + protected void zrank(String key, String member) { + Client c = jedis.getShard(key).getClient(); + c.zrank(key, member); + results.add(new FutureResult(c)); + } + + protected void zrevrank(String key, String member) { + Client c = jedis.getShard(key).getClient(); + c.zrevrank(key, member); + results.add(new FutureResult(c)); + } + + protected void zrevrange(String key, int start, int end) { + Client c = jedis.getShard(key).getClient(); + c.zrevrange(key, start, end); + results.add(new FutureResult(c)); + } + + protected void zrangeWithScores(String key, int start, int end) { + Client c = jedis.getShard(key).getClient(); + c.zrangeWithScores(key, start, end); + results.add(new FutureResult(c)); + } + + protected void zrevrangeWithScores(String key, int start, int end) { + Client c = jedis.getShard(key).getClient(); + c.zrevrangeWithScores(key, start, end); + results.add(new FutureResult(c)); + } + + protected void zcard(String key) { + Client c = jedis.getShard(key).getClient(); + c.zcard(key); + results.add(new FutureResult(c)); + } + + protected void zscore(String key, String member) { + Client c = jedis.getShard(key).getClient(); + c.zscore(key, member); + results.add(new FutureResult(c)); + } + + protected void sort(String key) { + Client c = jedis.getShard(key).getClient(); + c.sort(key); + results.add(new FutureResult(c)); + } + + protected void sort(String key, SortingParams sortingParameters) { + Client c = jedis.getShard(key).getClient(); + c.sort(key, sortingParameters); + results.add(new FutureResult(c)); + } + + protected void zcount(String key, double min, double max) { + Client c = jedis.getShard(key).getClient(); + c.zcount(key, min, max); + results.add(new FutureResult(c)); + } + + protected void zrangeByScore(String key, double min, double max) { + Client c = jedis.getShard(key).getClient(); + c.zrangeByScore(key, min, max); + results.add(new FutureResult(c)); + } + + protected void zrangeByScore(String key, double min, double max, + int offset, int count) { + Client c = jedis.getShard(key).getClient(); + c.zrangeByScore(key, min, max, offset, count); + results.add(new FutureResult(c)); + } + + protected void zrangeByScoreWithScores(String key, double min, double max) { + Client c = jedis.getShard(key).getClient(); + c.zrangeByScoreWithScores(key, min, max); + results.add(new FutureResult(c)); + } + + protected void zrangeByScoreWithScores(String key, double min, double max, + int offset, int count) { + Client c = jedis.getShard(key).getClient(); + c.zrangeByScoreWithScores(key, min, max, offset, count); + results.add(new FutureResult(c)); + } + + protected void zremrangeByRank(String key, int start, int end) { + Client c = jedis.getShard(key).getClient(); + c.zremrangeByRank(key, start, end); + results.add(new FutureResult(c)); + } + + protected void zremrangeByScore(String key, double start, double end) { + Client c = jedis.getShard(key).getClient(); + c.zremrangeByScore(key, start, end); + results.add(new FutureResult(c)); + } + + protected void linsert(String key, LIST_POSITION where, String pivot, + String value) { + Client c = jedis.getShard(key).getClient(); + c.linsert(key, where, pivot, value); + results.add(new FutureResult(c)); + } + + public List getResults() { + List r = new ArrayList(); + for (FutureResult fr : results) { + r.add(fr.get()); + } + return r; + } + + public abstract void execute(); +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java index 1ce34b2..5190306 100644 --- a/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java +++ b/src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java @@ -10,112 +10,138 @@ import org.junit.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisShardInfo; import redis.clients.jedis.ShardedJedis; +import redis.clients.jedis.ShardedJedisPipeline; import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort; import redis.clients.util.Hashing; public class ShardedJedisTest extends Assert { private static HostAndPort redis1 = HostAndPortUtil.getRedisServers() - .get(0); + .get(0); private static HostAndPort redis2 = HostAndPortUtil.getRedisServers() - .get(1); + .get(1); @Test public void checkSharding() throws IOException { - List shards = new ArrayList(); - shards.add(new JedisShardInfo(redis1.host, redis1.port)); - shards.add(new JedisShardInfo(redis2.host, redis2.port)); - ShardedJedis jedis = new ShardedJedis(shards); - JedisShardInfo s1 = jedis.getShardInfo("a"); - JedisShardInfo s2 = jedis.getShardInfo("b"); - assertNotSame(s1, s2); + List shards = new ArrayList(); + shards.add(new JedisShardInfo(redis1.host, redis1.port)); + shards.add(new JedisShardInfo(redis2.host, redis2.port)); + ShardedJedis jedis = new ShardedJedis(shards); + JedisShardInfo s1 = jedis.getShardInfo("a"); + JedisShardInfo s2 = jedis.getShardInfo("b"); + assertNotSame(s1, s2); } @Test public void trySharding() throws IOException { - List shards = new ArrayList(); - JedisShardInfo si = new JedisShardInfo(redis1.host, redis1.port); - si.setPassword("foobared"); - shards.add(si); - si = new JedisShardInfo(redis2.host, redis2.port); - si.setPassword("foobared"); - shards.add(si); - ShardedJedis jedis = new ShardedJedis(shards); - jedis.set("a", "bar"); - JedisShardInfo s1 = jedis.getShardInfo("a"); - jedis.set("b", "bar1"); - JedisShardInfo s2 = jedis.getShardInfo("b"); - jedis.disconnect(); + List shards = new ArrayList(); + JedisShardInfo si = new JedisShardInfo(redis1.host, redis1.port); + si.setPassword("foobared"); + shards.add(si); + si = new JedisShardInfo(redis2.host, redis2.port); + si.setPassword("foobared"); + shards.add(si); + ShardedJedis jedis = new ShardedJedis(shards); + jedis.set("a", "bar"); + JedisShardInfo s1 = jedis.getShardInfo("a"); + jedis.set("b", "bar1"); + JedisShardInfo s2 = jedis.getShardInfo("b"); + jedis.disconnect(); - Jedis j = new Jedis(s1.getHost(), s1.getPort()); - j.auth("foobared"); - assertEquals("bar", j.get("a")); - j.disconnect(); + Jedis j = new Jedis(s1.getHost(), s1.getPort()); + j.auth("foobared"); + assertEquals("bar", j.get("a")); + j.disconnect(); - j = new Jedis(s2.getHost(), s2.getPort()); - j.auth("foobared"); - assertEquals("bar1", j.get("b")); - j.disconnect(); + j = new Jedis(s2.getHost(), s2.getPort()); + j.auth("foobared"); + assertEquals("bar1", j.get("b")); + j.disconnect(); } @Test public void tryShardingWithMurmure() throws IOException { - List shards = new ArrayList(); - JedisShardInfo si = new JedisShardInfo(redis1.host, redis1.port); - si.setPassword("foobared"); - shards.add(si); - si = new JedisShardInfo(redis2.host, redis2.port); - si.setPassword("foobared"); - shards.add(si); - ShardedJedis jedis = new ShardedJedis(shards, Hashing.MURMUR_HASH); - jedis.set("a", "bar"); - JedisShardInfo s1 = jedis.getShardInfo("a"); - jedis.set("b", "bar1"); - JedisShardInfo s2 = jedis.getShardInfo("b"); - jedis.disconnect(); + List shards = new ArrayList(); + JedisShardInfo si = new JedisShardInfo(redis1.host, redis1.port); + si.setPassword("foobared"); + shards.add(si); + si = new JedisShardInfo(redis2.host, redis2.port); + si.setPassword("foobared"); + shards.add(si); + ShardedJedis jedis = new ShardedJedis(shards, Hashing.MURMUR_HASH); + jedis.set("a", "bar"); + JedisShardInfo s1 = jedis.getShardInfo("a"); + jedis.set("b", "bar1"); + JedisShardInfo s2 = jedis.getShardInfo("b"); + jedis.disconnect(); - Jedis j = new Jedis(s1.getHost(), s1.getPort()); - j.auth("foobared"); - assertEquals("bar", j.get("a")); - j.disconnect(); + Jedis j = new Jedis(s1.getHost(), s1.getPort()); + j.auth("foobared"); + assertEquals("bar", j.get("a")); + j.disconnect(); - j = new Jedis(s2.getHost(), s2.getPort()); - j.auth("foobared"); - assertEquals("bar1", j.get("b")); - j.disconnect(); + j = new Jedis(s2.getHost(), s2.getPort()); + j.auth("foobared"); + assertEquals("bar1", j.get("b")); + j.disconnect(); } @Test public void checkKeyTags() { - List shards = new ArrayList(); - shards.add(new JedisShardInfo(redis1.host, redis1.port)); - shards.add(new JedisShardInfo(redis2.host, redis2.port)); - ShardedJedis jedis = new ShardedJedis(shards, - ShardedJedis.DEFAULT_KEY_TAG_PATTERN); + List shards = new ArrayList(); + shards.add(new JedisShardInfo(redis1.host, redis1.port)); + shards.add(new JedisShardInfo(redis2.host, redis2.port)); + ShardedJedis jedis = new ShardedJedis(shards, + ShardedJedis.DEFAULT_KEY_TAG_PATTERN); - assertEquals(jedis.getKeyTag("foo"), "foo"); - assertEquals(jedis.getKeyTag("foo{bar}"), "bar"); - assertEquals(jedis.getKeyTag("foo{bar}}"), "bar"); // default pattern is - // non greedy - assertEquals(jedis.getKeyTag("{bar}foo"), "bar"); // Key tag may appear - // anywhere - assertEquals(jedis.getKeyTag("f{bar}oo"), "bar"); // Key tag may appear - // anywhere + assertEquals(jedis.getKeyTag("foo"), "foo"); + assertEquals(jedis.getKeyTag("foo{bar}"), "bar"); + assertEquals(jedis.getKeyTag("foo{bar}}"), "bar"); // default pattern is + // non greedy + assertEquals(jedis.getKeyTag("{bar}foo"), "bar"); // Key tag may appear + // anywhere + assertEquals(jedis.getKeyTag("f{bar}oo"), "bar"); // Key tag may appear + // anywhere - JedisShardInfo s1 = jedis.getShardInfo("abc{bar}"); - JedisShardInfo s2 = jedis.getShardInfo("foo{bar}"); - assertSame(s1, s2); + JedisShardInfo s1 = jedis.getShardInfo("abc{bar}"); + JedisShardInfo s2 = jedis.getShardInfo("foo{bar}"); + assertSame(s1, s2); - JedisShardInfo s3 = jedis.getShardInfo("a"); - JedisShardInfo s4 = jedis.getShardInfo("b"); - assertNotSame(s3, s4); + JedisShardInfo s3 = jedis.getShardInfo("a"); + JedisShardInfo s4 = jedis.getShardInfo("b"); + assertNotSame(s3, s4); - ShardedJedis jedis2 = new ShardedJedis(shards); + ShardedJedis jedis2 = new ShardedJedis(shards); - assertEquals(jedis2.getKeyTag("foo"), "foo"); - assertNotSame(jedis2.getKeyTag("foo{bar}"), "bar"); + assertEquals(jedis2.getKeyTag("foo"), "foo"); + assertNotSame(jedis2.getKeyTag("foo{bar}"), "bar"); - JedisShardInfo s5 = jedis2.getShardInfo("foo{bar}"); - JedisShardInfo s6 = jedis2.getShardInfo("abc{bar}"); - assertNotSame(s5, s6); + JedisShardInfo s5 = jedis2.getShardInfo("foo{bar}"); + JedisShardInfo s6 = jedis2.getShardInfo("abc{bar}"); + assertNotSame(s5, s6); + } + + @Test + public void shardedPipeline() { + List shards = new ArrayList(); + shards.add(new JedisShardInfo(redis1.host, redis1.port)); + shards.add(new JedisShardInfo(redis2.host, redis2.port)); + shards.get(0).setPassword("foobared"); + shards.get(1).setPassword("foobared"); + ShardedJedis jedis = new ShardedJedis(shards); + + jedis.set("a", "a"); + jedis.set("b", "b"); + + assertTrue(!jedis.getShard("a").equals(jedis.getShard("b"))); + + List results = jedis.pipelined(new ShardedJedisPipeline() { + public void execute() { + get("a"); + get("b"); + } + }); + + assertEquals("a", results.get(0)); + assertEquals("b", results.get(1)); } } \ No newline at end of file