diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 1c42b94..7c55f9a 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -21,7 +21,6 @@ public class Connection { private Socket socket; private RedisOutputStream outputStream; private RedisInputStream inputStream; - private int pipelinedCommands = 0; private int timeout = Protocol.DEFAULT_TIMEOUT; public Socket getSocket() { @@ -81,14 +80,12 @@ public class Connection { protected Connection sendCommand(final Command cmd, final byte[]... args) { connect(); Protocol.sendCommand(outputStream, cmd, args); - pipelinedCommands++; return this; } protected Connection sendCommand(final Command cmd) { connect(); Protocol.sendCommand(outputStream, cmd, new byte[0][]); - pipelinedCommands++; return this; } @@ -161,7 +158,6 @@ public class Connection { protected String getStatusCodeReply() { flush(); - pipelinedCommands--; final byte[] resp = (byte[]) Protocol.read(inputStream); if (null == resp) { return null; @@ -181,13 +177,11 @@ public class Connection { public byte[] getBinaryBulkReply() { flush(); - pipelinedCommands--; return (byte[]) Protocol.read(inputStream); } public Long getIntegerReply() { flush(); - pipelinedCommands--; return (Long) Protocol.read(inputStream); } @@ -198,45 +192,23 @@ public class Connection { @SuppressWarnings("unchecked") public List getBinaryMultiBulkReply() { flush(); - pipelinedCommands--; return (List) Protocol.read(inputStream); } @SuppressWarnings("unchecked") public List getObjectMultiBulkReply() { flush(); - pipelinedCommands--; return (List) Protocol.read(inputStream); } @SuppressWarnings("unchecked") public List getIntegerMultiBulkReply() { flush(); - pipelinedCommands--; return (List) Protocol.read(inputStream); } - public List getAll() { - return getAll(0); - } - - public List getAll(int except) { - List all = new ArrayList(); - flush(); - while (pipelinedCommands > except) { - try{ - all.add(Protocol.read(inputStream)); - }catch(JedisDataException e){ - all.add(e); - } - pipelinedCommands--; - } - return all; - } - public Object getOne() { flush(); - pipelinedCommands--; return Protocol.read(inputStream); } } diff --git a/src/main/java/redis/clients/jedis/Pipeline.java b/src/main/java/redis/clients/jedis/Pipeline.java index 98cab69..786ef8c 100755 --- a/src/main/java/redis/clients/jedis/Pipeline.java +++ b/src/main/java/redis/clients/jedis/Pipeline.java @@ -62,17 +62,24 @@ public class Pipeline extends MultiKeyPipelineBase { protected Client getClient(String key) { return client; } - + + public Object getOneWithJedisDataException() { + try { + return client.getOne(); + } catch (JedisDataException e) { + return e; + } + } + /** * Syncronize pipeline by reading all responses. This operation close the * pipeline. In order to get return values from pipelined commands, capture * the different Response of the commands you execute. */ public void sync() { - List unformatted = client.getAll(); - for (Object o : unformatted) { - generateResponse(o); - } + while (hasPipelinedResponse()) { + generateResponse(getOneWithJedisDataException()); + } } /** @@ -84,12 +91,11 @@ public class Pipeline extends MultiKeyPipelineBase { * @return A list of all the responses in the order you executed them. */ public List syncAndReturnAll() { - List unformatted = client.getAll(); List formatted = new ArrayList(); - for (Object o : unformatted) { - try { - formatted.add(generateResponse(o).get()); + while (hasPipelinedResponse()) { + try { + formatted.add(generateResponse(getOneWithJedisDataException()).get()); } catch (JedisDataException e) { formatted.add(e); } diff --git a/src/main/java/redis/clients/jedis/Queable.java b/src/main/java/redis/clients/jedis/Queable.java index b8ff0e5..d6ae31f 100644 --- a/src/main/java/redis/clients/jedis/Queable.java +++ b/src/main/java/redis/clients/jedis/Queable.java @@ -24,4 +24,11 @@ public class Queable { return lr; } + protected boolean hasPipelinedResponse() { + return pipelinedResponses.size() > 0; + } + + protected int getPipelinedResponseLength() { + return pipelinedResponses.size(); + } } diff --git a/src/main/java/redis/clients/jedis/Transaction.java b/src/main/java/redis/clients/jedis/Transaction.java index 949f541..4d88932 100644 --- a/src/main/java/redis/clients/jedis/Transaction.java +++ b/src/main/java/redis/clients/jedis/Transaction.java @@ -30,9 +30,26 @@ public class Transaction extends MultiKeyPipelineBase { return client; } + public Object getOneWithJedisDataException() { + try { + return client.getOne(); + } catch (JedisDataException e) { + return e; + } + } + + private void consumeResponse(int count) { + for (int i = 0 ; i < count ; i++) + getOneWithJedisDataException(); + } + public List exec() { + // Discard multi + consumeResponse(1); + // Discard QUEUED or ERROR + consumeResponse(getPipelinedResponseLength()); + client.exec(); - client.getAll(1); // Discard all but the last reply List unformatted = client.getObjectMultiBulkReply(); if (unformatted == null) { @@ -50,8 +67,12 @@ public class Transaction extends MultiKeyPipelineBase { } public List> execGetResponse() { + // Discard multi + consumeResponse(1); + // Discard QUEUED or ERROR + consumeResponse(getPipelinedResponseLength()); + client.exec(); - client.getAll(1); // Discard all but the last reply List unformatted = client.getObjectMultiBulkReply(); if (unformatted == null) { @@ -65,8 +86,8 @@ public class Transaction extends MultiKeyPipelineBase { } public String discard() { + consumeResponse(getPipelinedResponseLength()); client.discard(); - client.getAll(1); // Discard all but the last reply inTransaction = false; clean(); return client.getStatusCodeReply();