diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index 1661524..16a9da8 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -963,9 +963,6 @@ public class BinaryClient extends Connection { } public void resetState() { - if (isInMulti()) - discard(); - if (isInWatch()) unwatch(); } diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index 7f81897..49c2603 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -23,7 +23,9 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands, AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable { protected Client client = null; - + protected Transaction transaction = null; + protected Pipeline pipeline = null; + public BinaryJedis(final String host) { URI uri = URI.create(host); if (uri.getScheme() != null && uri.getScheme().equals("redis")) { @@ -1753,7 +1755,9 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, public Transaction multi() { client.multi(); - return new Transaction(client); + client.getOne(); // expected OK + transaction = new Transaction(client); + return transaction; } @Deprecated @@ -1767,6 +1771,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, List results = null; jedisTransaction.setClient(client); client.multi(); + client.getOne(); // expected OK jedisTransaction.execute(); results = jedisTransaction.exec(); return results; @@ -1789,9 +1794,23 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, public void resetState() { if (client.isConnected()) { + if (transaction != null) { + transaction.clear(); + } + + if (pipeline != null) { + pipeline.clear(); + } + + if (client.isInWatch()) { + unwatch(); + } + client.resetState(); - client.getAll(); } + + transaction = null; + pipeline = null; } public String watch(final byte[]... keys) { @@ -2207,7 +2226,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, } public Pipeline pipelined() { - Pipeline pipeline = new Pipeline(); + pipeline = new Pipeline(); pipeline.setClient(client); return pipeline; } diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index c50c706..c57b170 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -21,7 +21,6 @@ public class Connection implements Closeable { private Socket socket; private RedisOutputStream outputStream; private RedisInputStream inputStream; - private int pipelinedCommands = 0; private int timeout = Protocol.DEFAULT_TIMEOUT; private boolean broken = false; @@ -78,7 +77,6 @@ public class Connection implements Closeable { try { connect(); Protocol.sendCommand(outputStream, cmd, args); - pipelinedCommands++; return this; } catch (JedisConnectionException ex) { // Any other exceptions related to connection? @@ -91,7 +89,6 @@ public class Connection implements Closeable { try { connect(); Protocol.sendCommand(outputStream, cmd, new byte[0][]); - pipelinedCommands++; return this; } catch (JedisConnectionException ex) { // Any other exceptions related to connection? @@ -180,7 +177,6 @@ public class Connection implements Closeable { protected String getStatusCodeReply() { flush(); - pipelinedCommands--; final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); if (null == resp) { return null; @@ -200,13 +196,11 @@ public class Connection implements Closeable { public byte[] getBinaryBulkReply() { flush(); - pipelinedCommands--; return (byte[]) readProtocolWithCheckingBroken(); } public Long getIntegerReply() { flush(); - pipelinedCommands--; return (Long) readProtocolWithCheckingBroken(); } @@ -217,14 +211,9 @@ public class Connection implements Closeable { @SuppressWarnings("unchecked") public List getBinaryMultiBulkReply() { flush(); - pipelinedCommands--; return (List) readProtocolWithCheckingBroken(); } - public void resetPipelinedCount() { - pipelinedCommands = 0; - } - @SuppressWarnings("unchecked") public List getRawObjectMultiBulkReply() { return (List) readProtocolWithCheckingBroken(); @@ -232,38 +221,17 @@ public class Connection implements Closeable { public List getObjectMultiBulkReply() { flush(); - pipelinedCommands--; return getRawObjectMultiBulkReply(); } @SuppressWarnings("unchecked") public List getIntegerMultiBulkReply() { - flush(); - pipelinedCommands--; - return (List) readProtocolWithCheckingBroken(); - } - - public List getAll() { - return getAll(0); - } - - public List getAll(int except) { - List all = new ArrayList(); - flush(); - while (pipelinedCommands > except) { - try { - all.add(readProtocolWithCheckingBroken()); - } catch (JedisDataException e) { - all.add(e); - } - pipelinedCommands--; - } - return all; + flush(); + return (List) Protocol.read(inputStream); } public Object getOne() { flush(); - pipelinedCommands--; return readProtocolWithCheckingBroken(); } @@ -288,4 +256,17 @@ public class Connection implements Closeable { throw exc; } } + + public List getMany(int count) { + flush(); + List responses = new ArrayList(); + for (int i = 0; i < count; i++) { + try { + responses.add(readProtocolWithCheckingBroken()); + } catch (JedisDataException e) { + responses.add(e); + } + } + return responses; + } } diff --git a/src/main/java/redis/clients/jedis/JedisPubSub.java b/src/main/java/redis/clients/jedis/JedisPubSub.java index 7dfcf07..2ded002 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSub.java +++ b/src/main/java/redis/clients/jedis/JedisPubSub.java @@ -162,12 +162,6 @@ public abstract class JedisPubSub { /* Invalidate instance since this thread is no longer listening */ this.client = null; - - /* - * Reset pipeline count because subscribe() calls would have increased - * it but nothing decremented it. - */ - client.resetPipelinedCount(); } public int getSubscribedChannels() { diff --git a/src/main/java/redis/clients/jedis/Pipeline.java b/src/main/java/redis/clients/jedis/Pipeline.java index 4b19b5b..ba17387 100755 --- a/src/main/java/redis/clients/jedis/Pipeline.java +++ b/src/main/java/redis/clients/jedis/Pipeline.java @@ -69,13 +69,25 @@ public class Pipeline extends MultiKeyPipelineBase { return client; } + public void clear() { + if (isInMulti()) { + discard(); + } + + sync(); + } + + public boolean isInMulti() { + return currentMulti != null; + } + /** * Syncronize pipeline by reading all responses. This operation close the * pipeline. In order to get return values from pipelined commands, capture * the different Response of the commands you execute. */ public void sync() { - List unformatted = client.getAll(); + List unformatted = client.getMany(getPipelinedResponseLength()); for (Object o : unformatted) { generateResponse(o); } @@ -90,7 +102,7 @@ public class Pipeline extends MultiKeyPipelineBase { * @return A list of all the responses in the order you executed them. */ public List syncAndReturnAll() { - List unformatted = client.getAll(); + List unformatted = client.getMany(getPipelinedResponseLength()); List formatted = new ArrayList(); for (Object o : unformatted) { @@ -106,7 +118,6 @@ public class Pipeline extends MultiKeyPipelineBase { public Response discard() { if (currentMulti == null) throw new JedisDataException("DISCARD without MULTI"); - client.discard(); currentMulti = null; return getResponse(BuilderFactory.STRING); diff --git a/src/main/java/redis/clients/jedis/PipelineBlock.java b/src/main/java/redis/clients/jedis/PipelineBlock.java index 50181ba..24bae84 100644 --- a/src/main/java/redis/clients/jedis/PipelineBlock.java +++ b/src/main/java/redis/clients/jedis/PipelineBlock.java @@ -7,5 +7,9 @@ package redis.clients.jedis; * @see https://github.com/xetorthio/jedis/pull/498 */ public abstract class PipelineBlock extends Pipeline { + // For shadowing + @SuppressWarnings("unused") + private Client client; + public abstract void execute(); } diff --git a/src/main/java/redis/clients/jedis/Queable.java b/src/main/java/redis/clients/jedis/Queable.java index 769bf16..2cd5265 100644 --- a/src/main/java/redis/clients/jedis/Queable.java +++ b/src/main/java/redis/clients/jedis/Queable.java @@ -24,4 +24,11 @@ public class Queable { return lr; } + protected boolean hasPipelinedResponse() { + return pipelinedResponses.size() > 0; + } + + protected int getPipelinedResponseLength() { + return pipelinedResponses.size(); + } } diff --git a/src/main/java/redis/clients/jedis/Transaction.java b/src/main/java/redis/clients/jedis/Transaction.java index 69d50d2..e6088ff 100644 --- a/src/main/java/redis/clients/jedis/Transaction.java +++ b/src/main/java/redis/clients/jedis/Transaction.java @@ -31,9 +31,16 @@ public class Transaction extends MultiKeyPipelineBase { return client; } + public void clear() { + if (inTransaction) { + discard(); + } + } + public List exec() { + // Discard QUEUED or ERROR + client.getMany(getPipelinedResponseLength()); client.exec(); - client.getAll(1); // Discard all but the last reply List unformatted = client.getObjectMultiBulkReply(); if (unformatted == null) { @@ -51,8 +58,9 @@ public class Transaction extends MultiKeyPipelineBase { } public List> execGetResponse() { + // Discard QUEUED or ERROR + client.getMany(getPipelinedResponseLength()); client.exec(); - client.getAll(1); // Discard all but the last reply List unformatted = client.getObjectMultiBulkReply(); if (unformatted == null) { @@ -66,11 +74,15 @@ public class Transaction extends MultiKeyPipelineBase { } public String discard() { + client.getMany(getPipelinedResponseLength()); client.discard(); - client.getAll(1); // Discard all but the last reply inTransaction = false; clean(); return client.getStatusCodeReply(); } + public void setClient(Client client) { + this.client = client; + } + } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/TransactionBlock.java b/src/main/java/redis/clients/jedis/TransactionBlock.java index 86f7c44..c038ac2 100644 --- a/src/main/java/redis/clients/jedis/TransactionBlock.java +++ b/src/main/java/redis/clients/jedis/TransactionBlock.java @@ -9,6 +9,10 @@ import redis.clients.jedis.exceptions.JedisException; * @see https://github.com/xetorthio/jedis/pull/498 */ public abstract class TransactionBlock extends Transaction { + // For shadowing + @SuppressWarnings("unused") + private Client client; + public TransactionBlock(Client client) { super(client); } @@ -19,6 +23,6 @@ public abstract class TransactionBlock extends Transaction { public abstract void execute() throws JedisException; public void setClient(Client client) { - this.client = client; + super.setClient(client); } }