Pipeline supports Multi. Issue #251 is fixed.
This commit is contained in:
@@ -9,8 +9,51 @@ import redis.clients.jedis.BinaryClient.LIST_POSITION;
|
||||
import redis.clients.jedis.exceptions.JedisDataException;
|
||||
|
||||
public class Pipeline extends Queable {
|
||||
private Client client;
|
||||
|
||||
private MultiResponseBuilder currentMulti;
|
||||
|
||||
private class MultiResponseBuilder extends Builder<List<Object>>{
|
||||
private List<Response<?>> responses = new ArrayList<Response<?>>();
|
||||
|
||||
@Override
|
||||
public List<Object> build(Object data) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Object> list = (List<Object>)data;
|
||||
List<Object> values = new ArrayList<Object>();
|
||||
|
||||
if(list.size() != responses.size()){
|
||||
throw new JedisDataException("Expected data size " + responses.size() + " but was " + list.size());
|
||||
}
|
||||
|
||||
for(int i=0;i<list.size();i++){
|
||||
Response<?> response = responses.get(i);
|
||||
response.set(list.get(i));
|
||||
values.add(response.get());
|
||||
}
|
||||
return values;
|
||||
}
|
||||
|
||||
public void addResponse(Response<?> response){
|
||||
responses.add(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> Response<T> getResponse(Builder<T> builder) {
|
||||
if(currentMulti != null){
|
||||
super.getResponse(BuilderFactory.STRING); //Expected QUEUED
|
||||
|
||||
Response<T> lr = new Response<T>(builder);
|
||||
currentMulti.addResponse(lr);
|
||||
return lr;
|
||||
}
|
||||
else{
|
||||
return super.getResponse(builder);
|
||||
}
|
||||
}
|
||||
|
||||
private Client client;
|
||||
|
||||
public void setClient(Client client) {
|
||||
this.client = client;
|
||||
}
|
||||
@@ -39,9 +82,11 @@ public class Pipeline extends Queable {
|
||||
public List<Object> syncAndReturnAll() {
|
||||
List<Object> unformatted = client.getAll();
|
||||
List<Object> formatted = new ArrayList<Object>();
|
||||
|
||||
for (Object o : unformatted) {
|
||||
try {
|
||||
formatted.add(generateResponse(o).get());
|
||||
Response<?> pr = generateResponse(o);
|
||||
formatted.add(pr.get());
|
||||
} catch (JedisDataException e) {
|
||||
formatted.add(e);
|
||||
}
|
||||
@@ -1176,10 +1221,14 @@ public class Pipeline extends Queable {
|
||||
|
||||
public void exec() {
|
||||
client.exec();
|
||||
super.getResponse(currentMulti);
|
||||
currentMulti = null;
|
||||
}
|
||||
|
||||
public void multi() {
|
||||
client.multi();
|
||||
getResponse(BuilderFactory.STRING); //Expecting OK
|
||||
currentMulti = new MultiResponseBuilder();
|
||||
}
|
||||
|
||||
public Response<Long> publish(String channel, String message) {
|
||||
@@ -1230,5 +1279,6 @@ public class Pipeline extends Queable {
|
||||
public Response<String> select(int index){
|
||||
client.select(index);
|
||||
return getResponse(BuilderFactory.STRING);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -164,4 +164,29 @@ public class PipeliningTest extends Assert {
|
||||
}
|
||||
assertEquals(r.get(), "bar");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void multi(){
|
||||
Pipeline p = jedis.pipelined();
|
||||
p.multi();
|
||||
Response<Long> r1 = p.hincrBy("a", "f1", -1);
|
||||
Response<Long> r2 = p.hincrBy("a", "f1", -2);
|
||||
p.exec();
|
||||
List<Object> result = p.syncAndReturnAll();
|
||||
|
||||
assertEquals(new Long(-1), r1.get());
|
||||
assertEquals(new Long(-3), r2.get());
|
||||
|
||||
assertEquals(4, result.size());
|
||||
|
||||
assertEquals("OK", result.get(0));
|
||||
assertEquals("QUEUED", result.get(1));
|
||||
assertEquals("QUEUED", result.get(2));
|
||||
|
||||
//4th result is a list with the results from the multi
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Object> multiResult = (List<Object>) result.get(3);
|
||||
assertEquals(new Long(-1), multiResult.get(0));
|
||||
assertEquals(new Long(-3), multiResult.get(1));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user