Update the way the Redis host(s) can be configured for the tests.

There is now a single property : redis-hosts.
This property must contain at least 2 host definitions of the form "host:port" (comma separated).
Sharding tests need 2 hosts ...

If this is not the case, the default value used is "localhost:6379,localhost:6380".
Tests that required one host are using the first definition.
This commit is contained in:
Yaourt
2010-09-15 10:49:36 +02:00
27 changed files with 2051 additions and 420 deletions

View File

@@ -563,4 +563,28 @@ public class Client extends Connection {
public void configSet(String parameter, String value) {
sendCommand("CONFIG", "SET", parameter, value);
}
public void strlen(String key) {
sendCommand("STRLEN", key);
}
public void sync() {
sendCommand("SYNC");
}
public void lpushx(String key, String string) {
sendCommand("LPUSHX", key, string);
}
public void persist(String key) {
sendCommand("PERSIST", key);
}
public void rpushx(String key, String string) {
sendCommand("RPUSHX", key, string);
}
public void echo(String string) {
sendCommand("ECHO", string);
}
}

View File

@@ -10,6 +10,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import redis.clients.util.ShardInfo;
public class Jedis {
private Client client = null;
@@ -26,6 +28,14 @@ public class Jedis {
client.setTimeout(timeout);
}
public Jedis(ShardInfo shardInfo) {
client = new Client(shardInfo.getHost(), shardInfo.getPort());
client.setTimeout(shardInfo.getTimeout());
if (shardInfo.getPassword() != null) {
this.auth(shardInfo.getPassword());
}
}
public String ping() {
checkIsInMulti();
client.ping();
@@ -790,4 +800,33 @@ public class Jedis {
public boolean isConnected() {
return client.isConnected();
}
public int strlen(String key) {
client.strlen(key);
return client.getIntegerReply();
}
public void sync() {
client.sync();
}
public int lpushx(String key, String string) {
client.lpushx(key, string);
return client.getIntegerReply();
}
public int persist(String key) {
client.persist(key);
return client.getIntegerReply();
}
public int rpushx(String key, String string) {
client.rpushx(key, string);
return client.getIntegerReply();
}
public String echo(String string) {
client.echo(string);
return client.getBulkReply();
}
}

View File

@@ -1,11 +1,13 @@
package redis.clients.jedis;
import redis.clients.util.FixedResourcePool;
import redis.clients.util.ShardInfo;
public class JedisPool extends FixedResourcePool<Jedis> {
private String host;
private int port;
private int timeout;
private String password;
public JedisPool(String host) {
this.host = host;
@@ -23,6 +25,20 @@ public class JedisPool extends FixedResourcePool<Jedis> {
this.timeout = timeout;
}
public JedisPool(String host, int port, int timeout, String password) {
this.host = host;
this.port = port;
this.timeout = timeout;
this.password = password;
}
public JedisPool(ShardInfo shardInfo) {
this.host = shardInfo.getHost();
this.port = shardInfo.getPort();
this.timeout = shardInfo.getTimeout();
this.password = shardInfo.getPassword();
}
@Override
protected Jedis createResource() {
Jedis jedis = new Jedis(this.host, this.port, this.timeout);
@@ -30,6 +46,9 @@ public class JedisPool extends FixedResourcePool<Jedis> {
while (!done) {
try {
jedis.connect();
if (password != null) {
jedis.auth(password);
}
done = true;
} catch (Exception e) {
try {

View File

@@ -0,0 +1,357 @@
package redis.clients.jedis;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import redis.clients.util.Hashing;
import redis.clients.util.ShardInfo;
import redis.clients.util.Sharded;
public class ShardedJedis extends Sharded<Jedis> {
public ShardedJedis(List<ShardInfo> shards) {
super(shards);
}
public ShardedJedis(List<ShardInfo> shards, Hashing algo) {
super(shards, algo);
}
public String set(String key, String value) {
Jedis j = getShard(key);
return j.set(key, value);
}
public String get(String key) {
Jedis j = getShard(key);
return j.get(key);
}
public int exists(String key) {
Jedis j = getShard(key);
return j.exists(key);
}
public String type(String key) {
Jedis j = getShard(key);
return j.type(key);
}
public int expire(String key, int seconds) {
Jedis j = getShard(key);
return j.expire(key, seconds);
}
public int expireAt(String key, long unixTime) {
Jedis j = getShard(key);
return j.expireAt(key, unixTime);
}
public int ttl(String key) {
Jedis j = getShard(key);
return j.ttl(key);
}
public String getSet(String key, String value) {
Jedis j = getShard(key);
return j.getSet(key, value);
}
public int setnx(String key, String value) {
Jedis j = getShard(key);
return j.setnx(key, value);
}
public String setex(String key, int seconds, String value) {
Jedis j = getShard(key);
return j.setex(key, seconds, value);
}
public int decrBy(String key, int integer) {
Jedis j = getShard(key);
return j.decrBy(key, integer);
}
public int decr(String key) {
Jedis j = getShard(key);
return j.decr(key);
}
public int incrBy(String key, int integer) {
Jedis j = getShard(key);
return j.incrBy(key, integer);
}
public int incr(String key) {
Jedis j = getShard(key);
return j.incr(key);
}
public int append(String key, String value) {
Jedis j = getShard(key);
return j.append(key, value);
}
public String substr(String key, int start, int end) {
Jedis j = getShard(key);
return j.substr(key, start, end);
}
public int hset(String key, String field, String value) {
Jedis j = getShard(key);
return j.hset(key, field, value);
}
public String hget(String key, String field) {
Jedis j = getShard(key);
return j.hget(key, field);
}
public int hsetnx(String key, String field, String value) {
Jedis j = getShard(key);
return j.hsetnx(key, field, value);
}
public String hmset(String key, Map<String, String> hash) {
Jedis j = getShard(key);
return j.hmset(key, hash);
}
public List<String> hmget(String key, String... fields) {
Jedis j = getShard(key);
return j.hmget(key, fields);
}
public int hincrBy(String key, String field, int value) {
Jedis j = getShard(key);
return j.hincrBy(key, field, value);
}
public int hexists(String key, String field) {
Jedis j = getShard(key);
return j.hexists(key, field);
}
public int hdel(String key, String field) {
Jedis j = getShard(key);
return j.hdel(key, field);
}
public int hlen(String key) {
Jedis j = getShard(key);
return j.hlen(key);
}
public List<String> hkeys(String key) {
Jedis j = getShard(key);
return j.hkeys(key);
}
public List<String> hvals(String key) {
Jedis j = getShard(key);
return j.hvals(key);
}
public Map<String, String> hgetAll(String key) {
Jedis j = getShard(key);
return j.hgetAll(key);
}
public int rpush(String key, String string) {
Jedis j = getShard(key);
return j.rpush(key, string);
}
public int lpush(String key, String string) {
Jedis j = getShard(key);
return j.lpush(key, string);
}
public int llen(String key) {
Jedis j = getShard(key);
return j.llen(key);
}
public List<String> lrange(String key, int start, int end) {
Jedis j = getShard(key);
return j.lrange(key, start, end);
}
public String ltrim(String key, int start, int end) {
Jedis j = getShard(key);
return j.ltrim(key, start, end);
}
public String lindex(String key, int index) {
Jedis j = getShard(key);
return j.lindex(key, index);
}
public String lset(String key, int index, String value) {
Jedis j = getShard(key);
return j.lset(key, index, value);
}
public int lrem(String key, int count, String value) {
Jedis j = getShard(key);
return j.lrem(key, count, value);
}
public String lpop(String key) {
Jedis j = getShard(key);
return j.lpop(key);
}
public String rpop(String key) {
Jedis j = getShard(key);
return j.rpop(key);
}
public int sadd(String key, String member) {
Jedis j = getShard(key);
return j.sadd(key, member);
}
public Set<String> smembers(String key) {
Jedis j = getShard(key);
return j.smembers(key);
}
public int srem(String key, String member) {
Jedis j = getShard(key);
return j.srem(key, member);
}
public String spop(String key) {
Jedis j = getShard(key);
return j.spop(key);
}
public int scard(String key) {
Jedis j = getShard(key);
return j.scard(key);
}
public int sismember(String key, String member) {
Jedis j = getShard(key);
return j.sismember(key, member);
}
public String srandmember(String key) {
Jedis j = getShard(key);
return j.srandmember(key);
}
public int zadd(String key, double score, String member) {
Jedis j = getShard(key);
return j.zadd(key, score, member);
}
public Set<String> zrange(String key, int start, int end) {
Jedis j = getShard(key);
return j.zrange(key, start, end);
}
public int zrem(String key, String member) {
Jedis j = getShard(key);
return j.zrem(key, member);
}
public double zincrby(String key, double score, String member) {
Jedis j = getShard(key);
return j.zincrby(key, score, member);
}
public int zrank(String key, String member) {
Jedis j = getShard(key);
return j.zrank(key, member);
}
public int zrevrank(String key, String member) {
Jedis j = getShard(key);
return j.zrevrank(key, member);
}
public Set<String> zrevrange(String key, int start, int end) {
Jedis j = getShard(key);
return j.zrevrange(key, start, end);
}
public Set<Tuple> zrangeWithScores(String key, int start, int end) {
Jedis j = getShard(key);
return j.zrangeWithScores(key, start, end);
}
public Set<Tuple> zrevrangeWithScores(String key, int start, int end) {
Jedis j = getShard(key);
return j.zrevrangeWithScores(key, start, end);
}
public int zcard(String key) {
Jedis j = getShard(key);
return j.zcard(key);
}
public double zscore(String key, String member) {
Jedis j = getShard(key);
return j.zscore(key, member);
}
public List<String> sort(String key) {
Jedis j = getShard(key);
return j.sort(key);
}
public List<String> sort(String key, SortingParams sortingParameters) {
Jedis j = getShard(key);
return j.sort(key, sortingParameters);
}
public int zcount(String key, double min, double max) {
Jedis j = getShard(key);
return j.zcount(key, min, max);
}
public Set<String> zrangeByScore(String key, double min, double max) {
Jedis j = getShard(key);
return j.zrangeByScore(key, min, max);
}
public Set<String> zrangeByScore(String key, double min, double max,
int offset, int count) {
Jedis j = getShard(key);
return j.zrangeByScore(key, min, max, offset, count);
}
public Set<Tuple> zrangeByScoreWithScores(String key, double min, double max) {
Jedis j = getShard(key);
return j.zrangeByScoreWithScores(key, min, max);
}
public Set<Tuple> zrangeByScoreWithScores(String key, double min,
double max, int offset, int count) {
Jedis j = getShard(key);
return j.zrangeByScoreWithScores(key, min, max, offset, count);
}
public int zremrangeByRank(String key, int start, int end) {
Jedis j = getShard(key);
return j.zremrangeByRank(key, start, end);
}
public int zremrangeByScore(String key, double start, double end) {
Jedis j = getShard(key);
return j.zremrangeByScore(key, start, end);
}
public void disconnect() throws IOException {
for (Jedis jedis : getAllShards()) {
jedis.disconnect();
}
}
protected Jedis create(ShardInfo shard) {
return new Jedis(shard);
}
}

View File

@@ -9,16 +9,15 @@ import java.util.concurrent.TimeoutException;
/**
* Abstract resource pool of type T.
*
* <p/>
* Needs implementation for creation, validation and destruction of the
* resources.
*
* <p/>
* Keeps a fixed amount of resources
*
*
* @author Luis Dario Simonassi
*
* @param <T>
* The type of the resource to be managed.
* The type of the resource to be managed.
*/
public abstract class FixedResourcePool<T> {
@@ -31,130 +30,140 @@ public abstract class FixedResourcePool<T> {
* Generic Resource Wrapper
*/
private static class Wrapper<T> {
long timestamp;
T wrapped;
long timestamp;
T wrapped;
public Wrapper(T wrapped) {
this.wrapped = wrapped;
mark();
}
public Wrapper(T wrapped) {
this.wrapped = wrapped;
mark();
}
public void mark() {
timestamp = System.currentTimeMillis();
}
public void mark() {
timestamp = System.currentTimeMillis();
}
public long getLastMark() {
return timestamp;
}
public long getLastMark() {
return timestamp;
}
}
public abstract static class Printer {
public abstract void print(String str);
}
public static class DefaultPrinter extends Printer {
public void print(String str) {
System.out.println(str);
}
}
/**
* Generic Repair Thread
*/
protected class RepairThread extends Thread {
public void run() {
public void run() {
// Contribute to the repairing and validation effort until the pool
// is destroyed (finishig=true)
while (!finishing) {
Wrapper<T> wrapper;
try {
// Remove the oldest element from the repair queue.
wrapper = repairQueue.poll(timeBetweenValidation,
TimeUnit.MILLISECONDS);
if (wrapper == null) {
// If I've been waiting too much, i'll check the idle
// pool if connections need
// validation and move them to the repair queue
checkIdles();
continue;
}
} catch (InterruptedException e) {
continue;
}
// Contribute to the repairing and validation effort until the pool
// is destroyed (finishig=true)
while (!finishing) {
Wrapper<T> wrapper;
try {
// Remove the oldest element from the repair queue.
wrapper = repairQueue.poll(timeBetweenValidation,
TimeUnit.MILLISECONDS);
if (wrapper == null) {
// If I've been waiting too much, i'll check the idle
// pool if connections need
// validation and move them to the repair queue
checkIdles();
continue;
}
} catch (InterruptedException e) {
continue;
}
// Now, I have something to repair!
T resource = wrapper.wrapped;
boolean valid = false;
// Now, I have something to repair!
T resource = wrapper.wrapped;
boolean valid = false;
// Resources are null right after initialization, it means the
// same as being an invalid resource
if (resource != null) {
valid = isResourceValid(resource); // Validate the resource.
if (!valid)
fails++;
}
// Resources are null right after initialization, it means the
// same as being an invalid resource
if (resource != null) {
valid = isResourceValid(resource); // Validate the resource.
if (!valid)
fails++;
}
// If resource is invalid or null, create a new resource and
// destroy the invalid one.
if (!valid) {
T replace = createResource();
resourcesCreated++;
wrapper.wrapped = replace;
if (resource != null)
destroyResource(resource);
}
// If resource is invalid or null, create a new resource and
// destroy the invalid one.
if (!valid) {
T replace = createResource();
resourcesCreated++;
wrapper.wrapped = replace;
if (resource != null)
destroyResource(resource);
}
// Mark the resource as fresh!
wrapper.mark();
// Mark the resource as fresh!
wrapper.mark();
// Offer the resource to the available resources pool.
if (!availableQueue.offer(wrapper)) {
System.err
.println("This shouldn't happen, offering to available was rejected.");
}
}
// Offer the resource to the available resources pool.
if (!availableQueue.offer(wrapper)) {
System.err
.println("This shouldn't happen, offering to available was rejected.");
}
}
System.out.println("Ending thread ["
+ Thread.currentThread().getName() + "]");
}
println("Ending thread ["
+ Thread.currentThread().getName() + "]");
}
/**
* Check if resources in the idle queue needs to be repaired
*/
private void checkIdles() {
// Get a sample without removing it
Wrapper<T> wrapper = availableQueue.peek();
/**
* Check if resources in the idle queue needs to be repaired
*/
private void checkIdles() {
// Get a sample without removing it
Wrapper<T> wrapper = availableQueue.peek();
// If no available items, nothing to repair.
if (wrapper == null)
return;
// If no available items, nothing to repair.
if (wrapper == null)
return;
// Check if the sampled resource needs to be repaired
boolean repairNeeded = isValidationNeeded(wrapper);
if (!repairNeeded)
return;
// Check if the sampled resource needs to be repaired
boolean repairNeeded = isValidationNeeded(wrapper);
if (!repairNeeded)
return;
// Move available resources from the available queue to the repair
// queue until no repair is needed.
while (repairNeeded) {
// Move available resources from the available queue to the repair
// queue until no repair is needed.
while (repairNeeded) {
// Get the connection from the available queue and check again.
wrapper = availableQueue.poll();
// Get the connection from the available queue and check again.
wrapper = availableQueue.poll();
// No resources in the available queue, nothing to do
if (wrapper == null) {
repairNeeded = false;
return;
}
// No resources in the available queue, nothing to do
if (wrapper == null) {
repairNeeded = false;
return;
}
// Add the resource to the corresponding queue, depending on
// weather the resource needs to be repaired or not.
repairNeeded = isValidationNeeded(wrapper);
// Add the resource to the corresponding queue, depending on
// weather the resource needs to be repaired or not.
repairNeeded = isValidationNeeded(wrapper);
if (repairNeeded) {
if (!repairQueue.offer(wrapper)) {
System.err
.print("FATAL: This shouldn't happen, offering to repairing was rejected.");
}
} else {
if (!availableQueue.offer(wrapper)) {
System.err
.print("FATAL: This shouldn't happen, offering to available was rejected.");
}
}
}
}
if (repairNeeded) {
if (!repairQueue.offer(wrapper)) {
System.err
.print("FATAL: This shouldn't happen, offering to repairing was rejected.");
}
} else {
if (!availableQueue.offer(wrapper)) {
System.err
.print("FATAL: This shouldn't happen, offering to available was rejected.");
}
}
}
}
}
/*
@@ -166,28 +175,30 @@ public abstract class FixedResourcePool<T> {
private volatile long resourcesProvided = 0;
private volatile long resourcesReturned = 0;
private Printer printer = new DefaultPrinter();
/*
* Pool metrics accessing methods.
*/
public long getFailsReported() {
return failsReported;
return failsReported;
}
public long getFails() {
return fails;
return fails;
}
public long getResourcesCreated() {
return resourcesCreated;
return resourcesCreated;
}
public long getResourcesProvided() {
return resourcesProvided;
return resourcesProvided;
}
public long getResourcesReturned() {
return resourcesReturned;
return resourcesReturned;
}
/*
@@ -215,127 +226,138 @@ public abstract class FixedResourcePool<T> {
*/
public int getResourcesNumber() {
return resourcesNumber;
return resourcesNumber;
}
public void setResourcesNumber(int resourcesNumber) {
this.resourcesNumber = resourcesNumber;
this.resourcesNumber = resourcesNumber;
}
public int getRepairThreadsNumber() {
return repairThreadsNumber;
return repairThreadsNumber;
}
public void setRepairThreadsNumber(int repairThreadsNumber) {
if (initializated)
throw new IllegalStateException(
"Repair threads should be setted up before init()");
this.repairThreadsNumber = repairThreadsNumber;
if (initializated)
throw new IllegalStateException(
"Repair threads should be setted up before init()");
this.repairThreadsNumber = repairThreadsNumber;
}
public long getTimeBetweenValidation() {
return timeBetweenValidation;
return timeBetweenValidation;
}
public void setTimeBetweenValidation(long timeBetweenValidation) {
this.timeBetweenValidation = timeBetweenValidation;
this.timeBetweenValidation = timeBetweenValidation;
}
public void setName(String name) {
if (initializated)
throw new IllegalStateException(
"Name should be setted up before init()");
this.name = name;
if (initializated)
throw new IllegalStateException(
"Name should be setted up before init()");
this.name = name;
}
public String getName() {
if (name == null || name.isEmpty()) {
name = this.getClass().getName();
}
return name;
if (name == null || name.isEmpty()) {
name = this.getClass().getName();
}
return name;
}
public void setDefaultPoolWait(long defaultPoolWait) {
this.defaultPoolWait = defaultPoolWait;
this.defaultPoolWait = defaultPoolWait;
}
public long getDefaultPoolWait() {
return defaultPoolWait;
return defaultPoolWait;
}
public void setPrinter(Printer printer) {
this.printer = printer;
}
private void println(String str) {
if(printer != null)
printer.print(str);
}
/**
* Pool initialization & destruction
*/
public void destroy() {
checkInit();
checkInit();
System.out.println("Destroying [" + getName() + "]...");
println("Destroying [" + getName() + "]...");
// Signal al threads to end
finishing = true;
// Signal al threads to end
finishing = true;
System.out.println("Destroying [" + getName() + "] threads");
// Wait for the Repair Threas
for (int i = 0; i < repairThreads.length; i++) {
boolean joined = false;
do {
try {
repairThreads[i].interrupt();
repairThreads[i].join();
joined = true;
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!joined);
}
println("Destroying [" + getName() + "] threads");
// Wait for the Repair Threas
for (int i = 0; i < repairThreads.length; i++) {
boolean joined = false;
do {
try {
repairThreads[i].interrupt();
repairThreads[i].join();
joined = true;
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!joined);
}
System.out.println("Waiting for [" + getName()
+ "] resources to be returned.");
// Wait for all resources to be returned to the pool
synchronized (this) {
while (!inUse.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
println("Waiting for [" + getName()
+ "] resources to be returned.");
// Wait for all resources to be returned to the pool
synchronized (this) {
while (!inUse.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("Destroying [" + getName() + "] resources.");
// Destroy resources
for (Wrapper<T> resource : availableQueue) {
destroyResource(resource.wrapped);
}
printStatistics();
availableQueue.clear();
availableQueue = null;
println("Destroying [" + getName() + "] resources.");
// Destroy resources
for (Wrapper<T> resource : availableQueue) {
destroyResource(resource.wrapped);
}
for (Wrapper<T> resource : repairQueue) {
destroyResource(resource.wrapped);
}
availableQueue.clear();
availableQueue = null;
repairQueue.clear();
repairQueue = null;
for (Wrapper<T> resource : repairQueue) {
destroyResource(resource.wrapped);
}
// Destroy metrics timer
System.out.println("Shuting metrics timer for [" + getName()
+ "] down.");
t.cancel();
t = null;
repairQueue.clear();
repairQueue = null;
// Reset metrics
failsReported = 0;
fails = 0;
resourcesCreated = 0;
resourcesProvided = 0;
resourcesReturned = 0;
// Destroy metrics timer
println("Shuting metrics timer for [" + getName()
+ "] down.");
t.cancel();
t = null;
// Set states to initial values
initializated = false;
finishing = false;
// Reset metrics
failsReported = 0;
fails = 0;
resourcesCreated = 0;
resourcesProvided = 0;
resourcesReturned = 0;
System.out.println("Pool [" + getName() + "] successfully destroyed.");
// Set states to initial values
initializated = false;
finishing = false;
println("Pool [" + getName() + "] successfully destroyed.");
}
/**
@@ -343,203 +365,199 @@ public abstract class FixedResourcePool<T> {
*/
@SuppressWarnings("unchecked")
public void init() {
if (initializated == true) {
System.err.println("Warning, double initialization of [" + this
+ "]");
return;
}
if (initializated == true) {
println("Warning, double initialization of [" + this
+ "]");
return;
}
initializated = true;
initializated = true;
// Create queues with maximum possible capacity
availableQueue = new LinkedBlockingQueue<Wrapper<T>>(resourcesNumber);
repairQueue = new LinkedBlockingQueue<Wrapper<T>>(resourcesNumber);
// Create queues with maximum possible capacity
availableQueue = new LinkedBlockingQueue<Wrapper<T>>(resourcesNumber);
repairQueue = new LinkedBlockingQueue<Wrapper<T>>(resourcesNumber);
// Create and start the repair threads.
repairThreads = new FixedResourcePool.RepairThread[repairThreadsNumber];
for (int i = 0; i < repairThreads.length; i++) {
repairThreads[i] = new RepairThread();
repairThreads[i].setName("REPAIR[" + i + "]:" + getName());
repairThreads[i].start();
}
// Create and start the repair threads.
repairThreads = new FixedResourcePool.RepairThread[repairThreadsNumber];
for (int i = 0; i < repairThreads.length; i++) {
repairThreads[i] = new RepairThread();
repairThreads[i].setName("REPAIR[" + i + "]:" + getName());
repairThreads[i].start();
}
// Create resource wrappers with null content.
for (int i = 0; i < resourcesNumber; i++) {
if (!repairQueue.offer(new Wrapper<T>(null)))
throw new IllegalStateException(
"What!? not enough space in the repairQueue to offer the element. This shouldn't happen!");
}
// Create resource wrappers with null content.
for (int i = 0; i < resourcesNumber; i++) {
if (!repairQueue.offer(new Wrapper<T>(null)))
throw new IllegalStateException(
"What!? not enough space in the repairQueue to offer the element. This shouldn't happen!");
}
// Schedule a status report every 10 seconds.
t = new Timer();
t.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("**********************************");
System.out.println("* Pool name:[" + name + "]");
System.out.println("* resourcesCreated....:"
+ getResourcesCreated());
System.out.println("* failsReported.......:"
+ getFailsReported());
System.out.println("* fails...............:" + getFails());
System.out.println("* resourcesCreated....:"
+ getResourcesCreated());
System.out.println("* resourcesProvided...:"
+ getResourcesProvided());
System.out.println("* resourcesReturned...:"
+ getResourcesReturned());
System.out.println("* available size......:"
+ availableQueue.size());
System.out.println("* repair size.........:"
+ repairQueue.size());
System.out.println("**********************************");
}
}, 10000, 10000);
// Schedule a status report every 10 seconds.
t = new Timer();
t.schedule(new TimerTask() {
@Override
public void run() {
printStatistics();
}
}, 10000, 10000);
System.out.println("Initialized [" + name + "]");
println("Initialized [" + name + "]");
}
private void printStatistics() {
println("**********************************" +
"\n* Pool name:[" + name + "]" +
"\n* resourcesCreated....:" + getResourcesCreated() +
"\n* failsReported.......:" + getFailsReported() +
"\n* fails...............:" + getFails() +
"\n* resourcesCreated....:" + getResourcesCreated() +
"\n* resourcesProvided...:" + getResourcesProvided() +
"\n* resourcesReturned...:" + getResourcesReturned() +
"\n* available size......:" + availableQueue.size() +
"\n* repair size.........:" + repairQueue.size() +
"\n**********************************");
}
protected void checkInit() {
if (!initializated)
throw new IllegalStateException("Call the init() method first!");
if (!initializated)
throw new IllegalStateException("Call the init() method first!");
}
/**
* Returns true if wrapped resource needs validation
*
*
* @param wrapper
* @return
*/
private boolean isValidationNeeded(Wrapper<T> wrapper) {
// Add noise to the check times to avoid simultaneous resource checking.
long noisyTimeBetweenCheck = (timeBetweenValidation - (long) ((Math
.random() - 0.5) * (timeBetweenValidation / 10)));
// Add noise to the check times to avoid simultaneous resource checking.
long noisyTimeBetweenCheck = (timeBetweenValidation - (long) ((Math
.random() - 0.5) * (timeBetweenValidation / 10)));
// Check if the resource need to be checked.
return wrapper.getLastMark() + noisyTimeBetweenCheck < System
.currentTimeMillis();
// Check if the resource need to be checked.
return wrapper.getLastMark() + noisyTimeBetweenCheck < System
.currentTimeMillis();
}
/**
* Return a resource to the pool. When no longer needed.
*
*
* @param resource
*/
public void returnResource(T resource) {
checkInit();
checkInit();
Wrapper<T> wrapper;
Wrapper<T> wrapper;
if (resource == null)
throw new IllegalArgumentException(
"The resource shouldn't be null.");
if (resource == null)
throw new IllegalArgumentException(
"The resource shouldn't be null.");
// Delete the resource from the inUse list.
synchronized (inUse) {
wrapper = inUse.remove(resource);
}
// Delete the resource from the inUse list.
synchronized (inUse) {
wrapper = inUse.remove(resource);
}
if (wrapper == null)
throw new IllegalArgumentException("The resource [" + resource
+ "] isn't in the busy resources list.");
if (wrapper == null)
throw new IllegalArgumentException("The resource [" + resource
+ "] isn't in the busy resources list.");
if (isValidationNeeded(wrapper)) {
if (!repairQueue.offer(wrapper))
throw new IllegalStateException(
"This shouldn't happen. Offering to repair queue rejected.");
} else {
if (!availableQueue.offer(wrapper))
throw new IllegalStateException(
"This shouldn't happen. Offering to available queue rejected.");
}
resourcesReturned++;
if (isValidationNeeded(wrapper)) {
if (!repairQueue.offer(wrapper))
throw new IllegalStateException(
"This shouldn't happen. Offering to repair queue rejected.");
} else {
if (!availableQueue.offer(wrapper))
throw new IllegalStateException(
"This shouldn't happen. Offering to available queue rejected.");
}
resourcesReturned++;
if (finishing) {
synchronized (this) {
this.notify();
}
}
if (finishing) {
synchronized (this) {
this.notify();
}
}
}
/**
* Return a broken resource to the pool. If the application detects a
* malfunction of the resource. This resources will go directly to the
* repair queue.
*
*
* @param resource
*/
public void returnBrokenResource(T resource) {
checkInit();
Wrapper<T> wrapper;
checkInit();
Wrapper<T> wrapper;
// Delete the resource from the inUse list.
synchronized (inUse) {
wrapper = inUse.remove(resource);
}
// Delete the resource from the inUse list.
synchronized (inUse) {
wrapper = inUse.remove(resource);
}
if (wrapper == null)
throw new IllegalArgumentException("The resource [" + resource
+ "] isn't in the busy resources list.");
if (wrapper == null)
throw new IllegalArgumentException("The resource [" + resource
+ "] isn't in the busy resources list.");
if (!repairQueue.offer(wrapper))
throw new IllegalStateException(
"This shouldn't happen. Offering to repair queue rejected.");
resourcesReturned++;
if (!repairQueue.offer(wrapper))
throw new IllegalStateException(
"This shouldn't happen. Offering to repair queue rejected.");
resourcesReturned++;
if (finishing) {
synchronized (this) {
this.notify();
}
}
if (finishing) {
synchronized (this) {
this.notify();
}
}
}
/**
* Get a resource from the pool waiting the default time.
* {@link #setDefaultPoolWait(long)}
*
*
* @return the resource of type T
* @throws TimeoutException
*/
public T getResource() throws TimeoutException {
return getResource(defaultPoolWait);
return getResource(defaultPoolWait);
}
/**
* Get a resource from the pool.
*
* @param maxTime
* Max time you would like to wait for the resource
*
* @param maxTime Max time you would like to wait for the resource
* @return
* @throws TimeoutException
*/
public T getResource(long maxTime) throws TimeoutException {
if (finishing)
throw new IllegalStateException("Pool [" + getName()
+ "] is currently being destroyed.");
checkInit();
if (finishing)
throw new IllegalStateException("Pool [" + getName()
+ "] is currently being destroyed.");
checkInit();
final long tInit = System.currentTimeMillis();
do {
try {
long timeSpent = System.currentTimeMillis() - tInit;
long timeToSleep = maxTime - timeSpent;
timeToSleep = timeToSleep > 0 ? timeToSleep : 0;
if (timeToSleep == 0)
throw new TimeoutException("" + timeSpent + ">" + maxTime);
Wrapper<T> ret = availableQueue.poll(timeToSleep,
TimeUnit.MILLISECONDS);
if (ret != null) {
synchronized (inUse) {
inUse.put(ret.wrapped, ret);
}
resourcesProvided++;
return ret.wrapped;
}
} catch (InterruptedException e1) {
e1.printStackTrace();
} // If the wait gets interrupted, doesn't matter but print it (just
// in case).
} while (true);
final long tInit = System.currentTimeMillis();
do {
try {
long timeSpent = System.currentTimeMillis() - tInit;
long timeToSleep = maxTime - timeSpent;
timeToSleep = timeToSleep > 0 ? timeToSleep : 0;
if (timeToSleep == 0)
throw new TimeoutException("" + timeSpent + ">" + maxTime);
Wrapper<T> ret = availableQueue.poll(timeToSleep,
TimeUnit.MILLISECONDS);
if (ret != null) {
synchronized (inUse) {
inUse.put(ret.wrapped, ret);
}
resourcesProvided++;
return ret.wrapped;
}
} catch (InterruptedException e1) {
e1.printStackTrace();
} // If the wait gets interrupted, doesn't matter but print it (just
// in case).
} while (true);
}
/*
@@ -553,7 +571,7 @@ public abstract class FixedResourcePool<T> {
/**
* Check if the resource is still valid.
*
*
* @param resource
* @return
*/
@@ -561,14 +579,14 @@ public abstract class FixedResourcePool<T> {
/**
* Destroy a resource.
*
*
* @param resource
*/
protected abstract void destroyResource(T resource);
@Override
public String toString() {
return getName() + "[" + super.toString() + "]";
return getName() + "[" + super.toString() + "]";
}
/**

View File

@@ -0,0 +1,79 @@
package redis.clients.util;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public abstract class Hashing {
public static final Hashing MURMURE_HASH = new Hashing() {
public long hash(String key) {
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
byte[] data = key.getBytes();
int seed = 0x1234ABCD;
int m = 0x5bd1e995;
int r = 24;
// Initialize the hash to a 'random' value
int len = data.length;
int h = seed ^ len;
int i = 0;
while (len >= 4) {
int k = data[i + 0] & 0xFF;
k |= (data[i + 1] & 0xFF) << 8;
k |= (data[i + 2] & 0xFF) << 16;
k |= (data[i + 3] & 0xFF) << 24;
k *= m;
k ^= k >>> r;
k *= m;
h *= m;
h ^= k;
i += 4;
len -= 4;
}
switch (len) {
case 3:
h ^= (data[i + 2] & 0xFF) << 16;
case 2:
h ^= (data[i + 1] & 0xFF) << 8;
case 1:
h ^= (data[i + 0] & 0xFF);
h *= m;
}
h ^= h >>> 13;
h *= m;
h ^= h >>> 15;
return h;
}
};
public static final Hashing MD5 = new Hashing() {
private MessageDigest md5 = null; // avoid recurring construction
public long hash(String key) {
if (md5 == null) {
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(
"++++ no md5 algorythm found");
}
}
md5.reset();
md5.update(key.getBytes());
byte[] bKey = md5.digest();
long res = ((long) (bKey[3] & 0xFF) << 24)
| ((long) (bKey[2] & 0xFF) << 16)
| ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF);
return res;
}
};
public abstract long hash(String key);
}

View File

@@ -0,0 +1,98 @@
package redis.clients.util;
import redis.clients.jedis.Protocol;
public class ShardInfo {
@Override
public String toString() {
return "ShardInfo [host=" + host + ", port=" + port + ", weight="
+ weight + "]";
}
private String host;
private int port;
private int timeout;
private int weight;
private String password = null;
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public int getTimeout() {
return timeout;
}
public ShardInfo(String host) {
this(host, Protocol.DEFAULT_PORT);
}
public ShardInfo(String host, int port) {
this(host, port, 2000);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + port;
result = prime * result + timeout;
result = prime * result + weight;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ShardInfo other = (ShardInfo) obj;
if (host == null) {
if (other.host != null)
return false;
} else if (!host.equals(other.host))
return false;
if (port != other.port)
return false;
if (timeout != other.timeout)
return false;
if (weight != other.weight)
return false;
return true;
}
public ShardInfo(String host, int port, int timeout) {
this(host, port, timeout, Sharded.DEFAULT_WEIGHT);
}
public ShardInfo(String host, int port, int timeout, int weight) {
this.host = host;
this.port = port;
this.timeout = timeout;
this.weight = weight;
}
public String getPassword() {
return password;
}
public void setPassword(String auth) {
this.password = auth;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getWeight() {
return this.weight;
}
}

View File

@@ -0,0 +1,92 @@
package redis.clients.util;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public abstract class Sharded<T> {
public static final int DEFAULT_WEIGHT = 1;
private TreeMap<Long, ShardInfo> nodes;
private int totalWeight;
private Map<ShardInfo, T> resources;
private Hashing algo = Hashing.MD5;
public Sharded(List<ShardInfo> shards) {
initialize(shards);
}
public Sharded(List<ShardInfo> shards, Hashing algo) {
initialize(shards);
}
private void initialize(List<ShardInfo> shards) {
nodes = new TreeMap<Long, ShardInfo>();
resources = new HashMap<ShardInfo, T>();
totalWeight = 0;
for (ShardInfo shard : shards) {
totalWeight += shard.getWeight();
}
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("++++ no md5 algorythm found");
}
for (ShardInfo shard : shards) {
double factor = Math
.floor(((double) (40 * shards.size() * DEFAULT_WEIGHT))
/ (double) totalWeight);
for (long j = 0; j < factor; j++) {
byte[] d = md5.digest((shard.toString() + "-" + j).getBytes());
for (int h = 0; h < 4; h++) {
Long k = ((long) (d[3 + h * 4] & 0xFF) << 24)
| ((long) (d[2 + h * 4] & 0xFF) << 16)
| ((long) (d[1 + h * 4] & 0xFF) << 8)
| ((long) (d[0 + h * 4] & 0xFF));
nodes.put(k, shard);
}
}
resources.put(shard, create(shard));
}
}
public ShardInfo getShardInfo(String key) {
long hv = calculateHash(key);
return nodes.get(findPointFor(hv));
}
private Long calculateHash(String key) {
return algo.hash(key);
}
private Long findPointFor(Long hashK) {
Long k = nodes.ceilingKey(hashK);
if (k == null) {
k = nodes.firstKey();
}
return k;
}
public T getShard(String key) {
ShardInfo shard = getShardInfo(key);
return resources.get(shard);
}
protected abstract T create(ShardInfo shard);
public Collection<T> getAllShards() {
return resources.values();
}
}

View File

@@ -0,0 +1,58 @@
package redis.clients.jedis.tests;
import java.util.ArrayList;
import java.util.List;
import redis.clients.jedis.Protocol;
public class HostAndPortUtil {
private static List<HostAndPort> hostAndPortList = new ArrayList<HostAndPortUtil.HostAndPort>(2);
static {
final HostAndPort defaulthnp1 = new HostAndPort();
defaulthnp1.host = "localhost";
defaulthnp1.port = Protocol.DEFAULT_PORT;
hostAndPortList.add(defaulthnp1);
final HostAndPort defaulthnp2 = new HostAndPort();
defaulthnp2.host = "localhost";
defaulthnp2.port = Protocol.DEFAULT_PORT + 1;
hostAndPortList.add(defaulthnp2);
final String envHosts = System.getProperty("redis-hosts");
if (null != envHosts && 0 < envHosts.length()) {
final String[] hostDefs = envHosts.split(",");
if (null != hostDefs && 2 <= hostDefs.length) {
hostAndPortList = new ArrayList<HostAndPortUtil.HostAndPort>(hostDefs.length);
for(String hostDef : hostDefs) {
final String[] hostAndPort = hostDef.split(":");
if (null != hostAndPort && 2 == hostAndPort.length) {
final HostAndPort hnp = new HostAndPort();
hnp.host = hostAndPort[0];
try {
hnp.port = Integer.parseInt(hostAndPort[1]);
} catch(final NumberFormatException nfe){
hnp.port = Protocol.DEFAULT_PORT;
}
hostAndPortList.add(hnp);
}
}
}
}
final StringBuilder strb = new StringBuilder("Redis hosts to be used : ");
for(HostAndPort hnp : hostAndPortList){
strb.append('[').append(hnp.host).append(':').append(hnp.port).append(']').append(' ');
}
System.out.println(strb);
}
public static List<HostAndPort> getRedisServers() {
return hostAndPortList;
}
public static class HostAndPort {
public String host;
public int port;
}
}

View File

@@ -0,0 +1,72 @@
package redis.clients.jedis.tests;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Transaction;
public class JedisNewCommandsCheckTest extends Assert {
@Test
@Ignore(value = "Ignored because still missing information for DEBUG and LINSERT commands")
public void checkJedisIsUpdated() throws IOException {
String[] commands = getAvailableCommands();
Set<String> implementedCommands = getImplementedCommands();
Set<String> missingCommands = new HashSet<String>();
for (String command : commands) {
if (!implementedCommands.contains(command.trim())) {
missingCommands.add(command);
}
}
if (!missingCommands.isEmpty()) {
fail("There are missing commands: " + missingCommands.toString());
}
}
private Set<String> getImplementedCommands() {
Method[] methods = Jedis.class.getDeclaredMethods();
Set<String> implementedCommands = new HashSet<String>();
for (Method method : methods) {
implementedCommands.add(method.getName().trim().toLowerCase());
}
methods = JedisPubSub.class.getDeclaredMethods();
for (Method method : methods) {
implementedCommands.add(method.getName().trim().toLowerCase());
}
methods = Transaction.class.getDeclaredMethods();
for (Method method : methods) {
implementedCommands.add(method.getName().trim().toLowerCase());
}
implementedCommands.add("config");
return implementedCommands;
}
private String[] getAvailableCommands() throws MalformedURLException,
IOException {
URL url = new URL("http://dimaion.com/redis/master");
InputStream openStream = url.openStream();
DataInputStream dis = new DataInputStream(new BufferedInputStream(
openStream));
byte[] all = new byte[dis.available()];
dis.readFully(all);
String commandList = new String(all);
String[] commands = commandList.split("\n");
return commands;
}
}

View File

@@ -4,35 +4,18 @@ import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
public class JedisPoolTest extends Assert {
private static String host = "localhost";
private static int port = Protocol.DEFAULT_PORT;
static {
final String envHost = System.getProperty("redis-host");
final String envPort = System.getProperty("redis-port");
if (null != envHost && 0 < envHost.length()) {
host = envHost;
}
if (null != envPort && 0 < envPort.length()) {
try {
port = Integer.parseInt(envPort);
} catch (final NumberFormatException e) {}
}
System.out.println("Redis host to be used : " + host + ":" + port);
}
private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0);
@Test
public void checkConnections() throws TimeoutException {
JedisPool pool = new JedisPool(host, port, 2000);
JedisPool pool = new JedisPool(hnp.host, hnp.port, 2000);
pool.setResourcesNumber(10);
pool.init();
@@ -46,7 +29,7 @@ public class JedisPoolTest extends Assert {
@Test
public void checkConnectionWithDefaultPort() throws TimeoutException {
JedisPool pool = new JedisPool(host, port);
JedisPool pool = new JedisPool(hnp.host, hnp.port);
pool.setResourcesNumber(10);
pool.init();
@@ -60,7 +43,7 @@ public class JedisPoolTest extends Assert {
@Test
public void checkJedisIsReusedWhenReturned() throws TimeoutException {
JedisPool pool = new JedisPool(host, port);
JedisPool pool = new JedisPool(hnp.host, hnp.port);
pool.setResourcesNumber(1);
pool.init();
@@ -79,7 +62,7 @@ public class JedisPoolTest extends Assert {
@Test
public void checkPoolRepairedWhenJedisIsBroken() throws TimeoutException,
IOException {
JedisPool pool = new JedisPool(host, port);
JedisPool pool = new JedisPool(hnp.host, hnp.port);
pool.setResourcesNumber(1);
pool.init();
@@ -97,7 +80,7 @@ public class JedisPoolTest extends Assert {
@Test(expected = TimeoutException.class)
public void checkPoolOverflow() throws TimeoutException {
JedisPool pool = new JedisPool(host, port);
JedisPool pool = new JedisPool(hnp.host, hnp.port);
pool.setResourcesNumber(1);
pool.init();

View File

@@ -6,13 +6,16 @@ import java.util.Map;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.tests.commands.JedisCommandTestBase;
import redis.clients.util.RedisOutputStream;
import redis.clients.util.ShardInfo;
public class JedisTest extends JedisCommandTestBase {
@Test
public void useWithoutConnecting() {
Jedis jedis = new Jedis("localhost");
jedis.auth("foobared");
jedis.dbSize();
}
@@ -30,4 +33,11 @@ public class JedisTest extends JedisCommandTestBase {
assertEquals(hash, jedis.hgetAll("foo"));
}
@Test
public void connectWithShardInfo() {
ShardInfo shardInfo = new ShardInfo("localhost", Protocol.DEFAULT_PORT);
shardInfo.setPassword("foobared");
Jedis jedis = new Jedis(shardInfo);
jedis.get("foo");
}
}

View File

@@ -12,32 +12,16 @@ import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPipeline;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
public class PipeliningTest extends Assert {
private static String host = "localhost";
private static int port = Protocol.DEFAULT_PORT;
static {
final String envHost = System.getProperty("redis-host");
final String envPort = System.getProperty("redis-port");
if (null != envHost && 0 < envHost.length()) {
host = envHost;
}
if (null != envPort && 0 < envPort.length()) {
try {
port = Integer.parseInt(envPort);
} catch (final NumberFormatException e) {
}
}
System.out.println("Redis host to be used : " + host + ":" + port);
}
private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0);
private Jedis jedis;
@Before
public void setUp() throws Exception {
jedis = new Jedis(host, port, 500);
jedis = new Jedis(hnp.host, hnp.port, 500);
jedis.connect();
jedis.auth("foobared");
jedis.flushAll();

View File

@@ -0,0 +1,85 @@
package redis.clients.jedis.tests;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
import redis.clients.util.Hashing;
import redis.clients.util.ShardInfo;
public class ShardedJedisTest extends Assert {
private static HostAndPort redis1 = HostAndPortUtil.getRedisServers().get(0);
private static HostAndPort redis2 = HostAndPortUtil.getRedisServers().get(1);
@Test
public void checkSharding() throws IOException {
List<ShardInfo> shards = new ArrayList<ShardInfo>();
shards.add(new ShardInfo(redis1.host, redis1.port));
shards.add(new ShardInfo(redis2.host, redis2.port));
ShardedJedis jedis = new ShardedJedis(shards);
ShardInfo s1 = jedis.getShardInfo("a");
ShardInfo s2 = jedis.getShardInfo("b");
assertNotSame(s1, s2);
}
@Test
public void trySharding() throws IOException {
List<ShardInfo> shards = new ArrayList<ShardInfo>();
ShardInfo si = new ShardInfo(redis1.host, redis1.port);
si.setPassword("foobared");
shards.add(si);
si = new ShardInfo(redis2.host, redis2.port);
si.setPassword("foobared");
shards.add(si);
ShardedJedis jedis = new ShardedJedis(shards);
jedis.set("a", "bar");
ShardInfo s1 = jedis.getShardInfo("a");
jedis.set("b", "bar1");
ShardInfo s2 = jedis.getShardInfo("b");
jedis.disconnect();
Jedis j = new Jedis(s1.getHost(), s1.getPort());
j.auth("foobared");
assertEquals("bar", j.get("a"));
j.disconnect();
j = new Jedis(s2.getHost(), s2.getPort());
j.auth("foobared");
assertEquals("bar1", j.get("b"));
j.disconnect();
}
@Test
public void tryShardingWithMurmure() throws IOException {
List<ShardInfo> shards = new ArrayList<ShardInfo>();
ShardInfo si = new ShardInfo(redis1.host, redis1.port);
si.setPassword("foobared");
shards.add(si);
si = new ShardInfo(redis2.host, redis2.port);
si.setPassword("foobared");
shards.add(si);
ShardedJedis jedis = new ShardedJedis(shards, Hashing.MURMURE_HASH);
jedis.set("a", "bar");
ShardInfo s1 = jedis.getShardInfo("a");
jedis.set("b", "bar1");
ShardInfo s2 = jedis.getShardInfo("b");
jedis.disconnect();
Jedis j = new Jedis(s1.getHost(), s1.getPort());
j.auth("foobared");
assertEquals("bar", j.get("a"));
j.disconnect();
j = new Jedis(s2.getHost(), s2.getPort());
j.auth("foobared");
assertEquals("bar1", j.get("b"));
j.disconnect();
}
}

View File

@@ -5,13 +5,16 @@ import java.net.UnknownHostException;
import java.util.Calendar;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.tests.HostAndPortUtil;
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
public class GetSetBenchmark {
private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0);
private static final int TOTAL_OPERATIONS = 100000;
public static void main(String[] args) throws UnknownHostException,
IOException {
Jedis jedis = new Jedis("localhost");
Jedis jedis = new Jedis(hnp.host, hnp.port);
jedis.connect();
jedis.auth("foobared");
jedis.flushAll();

View File

@@ -6,13 +6,16 @@ import java.util.Calendar;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPipeline;
import redis.clients.jedis.tests.HostAndPortUtil;
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
public class PipelinedGetSetBenchmark {
private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0);
private static final int TOTAL_OPERATIONS = 200000;
public static void main(String[] args) throws UnknownHostException,
IOException {
Jedis jedis = new Jedis("localhost");
Jedis jedis = new Jedis(hnp.host, hnp.port);
jedis.connect();
jedis.auth("foobared");
jedis.flushAll();

View File

@@ -5,16 +5,20 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.tests.HostAndPortUtil;
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
public class PoolBenchmark {
private static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0);
private static final int TOTAL_OPERATIONS = 100000;
public static void main(String[] args) throws UnknownHostException,
IOException, TimeoutException, InterruptedException {
Jedis j = new Jedis("localhost");
Jedis j = new Jedis(hnp.host, hnp.port);
j.connect();
j.auth("foobared");
j.flushAll();
@@ -24,7 +28,7 @@ public class PoolBenchmark {
// withoutPool();
withPool();
long elapsed = System.currentTimeMillis() - t;
System.out.println(((1000 * 3 * TOTAL_OPERATIONS) / elapsed) + " ops");
System.out.println(((1000 * 2 * TOTAL_OPERATIONS) / elapsed) + " ops");
}
private static void withoutPool() throws InterruptedException {
@@ -35,7 +39,7 @@ public class PoolBenchmark {
Thread hj = new Thread(new Runnable() {
@Override
public void run() {
Jedis j = new Jedis("localhost");
Jedis j = new Jedis(hnp.host, hnp.port);
try {
j.connect();
j.auth("foobared");
@@ -59,31 +63,37 @@ public class PoolBenchmark {
}
private static void withPool() throws InterruptedException {
final JedisPool pool = new JedisPool("localhost");
pool.setResourcesNumber(1000);
pool.setDefaultPoolWait(20);
final JedisPool pool = new JedisPool(hnp.host, hnp.port,
2000, "foobared");
pool.setResourcesNumber(50);
pool.setDefaultPoolWait(1000000);
pool.init();
List<Thread> tds = new ArrayList<Thread>();
for (int i = 0; i < TOTAL_OPERATIONS; i++) {
final String key = "foo" + i;
final AtomicInteger ind = new AtomicInteger();
for (int i = 0; i < 50; i++) {
Thread hj = new Thread(new Runnable() {
@Override
public void run() {
try {
Jedis j = pool.getResource();
j.auth("foobared");
j.set(key, key);
j.get(key);
pool.returnResource(j);
} catch (Exception e) {
e.printStackTrace();
for (int i = 0; (i = ind.getAndIncrement()) < TOTAL_OPERATIONS;) {
try {
Jedis j = pool.getResource();
final String key = "foo" + i;
j.set(key, key);
j.get(key);
pool.returnResource(j);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
tds.add(hj);
hj.start();
}
for (Thread t : tds)
t.join();
pool.destroy();
}
}

View File

@@ -219,4 +219,20 @@ public class AllKindOfValuesCommandsTest extends JedisCommandTestBase {
jedis.select(1);
assertEquals(0, jedis.dbSize());
}
@Test
public void persist() {
jedis.setex("foo", 60 * 60, "bar");
assertTrue(jedis.ttl("foo") > 0);
int status = jedis.persist("foo");
assertEquals(1, status);
assertEquals(-1, jedis.ttl("foo"));
}
@Test
public void echo() {
String result = jedis.echo("hello world");
assertEquals("hello world", result);
}
}

View File

@@ -16,8 +16,13 @@ public class ControlCommandsTest extends JedisCommandTestBase {
@Test
public void bgsave() {
String status = jedis.bgsave();
assertEquals("Background saving started", status);
try {
String status = jedis.bgsave();
assertEquals("Background saving started", status);
} catch (JedisException e) {
assertEquals("ERR background save already in progress", e
.getMessage());
}
}
@Test
@@ -72,4 +77,10 @@ public class ControlCommandsTest extends JedisCommandTestBase {
assertEquals("OK", status);
jedis.configSet("maxmemory", memory);
}
@Test
public void sync() {
jedis.sync();
}
}

View File

@@ -9,26 +9,11 @@ import org.junit.After;
import org.junit.Before;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.tests.HostAndPortUtil;
import redis.clients.jedis.tests.HostAndPortUtil.HostAndPort;
public abstract class JedisCommandTestBase extends Assert {
protected static String host = "localhost";
protected static int port = Protocol.DEFAULT_PORT;
static {
final String envHost = System.getProperty("redis-host");
final String envPort = System.getProperty("redis-port");
if (null != envHost && 0 < envHost.length()) {
host = envHost;
}
if (null != envPort && 0 < envPort.length()) {
try {
port = Integer.parseInt(envPort);
} catch (final NumberFormatException e) {
}
}
System.out.println("Redis host to be used : " + host + ":" + port);
}
protected static HostAndPort hnp = HostAndPortUtil.getRedisServers().get(0);
protected Jedis jedis;
@@ -38,7 +23,7 @@ public abstract class JedisCommandTestBase extends Assert {
@Before
public void setUp() throws Exception {
jedis = new Jedis(host, port, 500);
jedis = new Jedis(hnp.host, hnp.port, 500);
jedis.connect();
jedis.auth("foobared");
jedis.flushAll();
@@ -50,7 +35,7 @@ public abstract class JedisCommandTestBase extends Assert {
}
protected Jedis createJedis() throws UnknownHostException, IOException {
Jedis j = new Jedis(host, port);
Jedis j = new Jedis(hnp.host, hnp.port);
j.connect();
j.auth("foobared");
j.flushAll();

View File

@@ -249,6 +249,25 @@ public class ListCommandsTest extends JedisCommandTestBase {
assertEquals(2, result.size());
assertEquals("foo", result.get(0));
assertEquals("bar", result.get(1));
}
@Test
public void lpushx() {
int status = jedis.lpushx("foo", "bar");
assertEquals(0, status);
jedis.lpush("foo", "a");
status = jedis.lpushx("foo", "b");
assertEquals(2, status);
}
@Test
public void rpushx() {
int status = jedis.rpushx("foo", "bar");
assertEquals(0, status);
jedis.lpush("foo", "a");
status = jedis.rpushx("foo", "b");
assertEquals(2, status);
}
}

View File

@@ -169,4 +169,10 @@ public class StringValuesCommandsTest extends JedisCommandTestBase {
assertEquals("This is a string", jedis.substr("s", 0, -1));
assertEquals(" string", jedis.substr("s", 9, 100000));
}
@Test
public void strlen() {
jedis.set("s", "This is a string");
assertEquals("This is a string".length(), jedis.strlen("s"));
}
}

View File

@@ -19,7 +19,7 @@ public class TransactionCommandsTest extends JedisCommandTestBase {
public void setUp() throws Exception {
super.setUp();
nj = new Jedis(host, port, 500);
nj = new Jedis(hnp.host, hnp.port, 500);
nj.connect();
nj.auth("foobared");
nj.flushAll();