From f7bd9c8313096d2402afbeb8d3478f2b81a0027b Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 20 Jan 2014 00:28:00 +0900 Subject: [PATCH 1/6] Refactor Pipeline / Transaction to consume responses based on their requests, not rely on pipelinedCommands * remove pipelinedCommands field at Connection class ** it was a risky state value *** it was under 0 or over 0(though all commands are executed) while some situation * remove Connection.getAll(), Connection.getAll(int except) --- .../java/redis/clients/jedis/Connection.java | 28 ------------------- .../java/redis/clients/jedis/Pipeline.java | 24 ++++++++++------ .../java/redis/clients/jedis/Queable.java | 7 +++++ .../java/redis/clients/jedis/Transaction.java | 27 ++++++++++++++++-- 4 files changed, 46 insertions(+), 40 deletions(-) diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 1c42b94..7c55f9a 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -21,7 +21,6 @@ public class Connection { private Socket socket; private RedisOutputStream outputStream; private RedisInputStream inputStream; - private int pipelinedCommands = 0; private int timeout = Protocol.DEFAULT_TIMEOUT; public Socket getSocket() { @@ -81,14 +80,12 @@ public class Connection { protected Connection sendCommand(final Command cmd, final byte[]... args) { connect(); Protocol.sendCommand(outputStream, cmd, args); - pipelinedCommands++; return this; } protected Connection sendCommand(final Command cmd) { connect(); Protocol.sendCommand(outputStream, cmd, new byte[0][]); - pipelinedCommands++; return this; } @@ -161,7 +158,6 @@ public class Connection { protected String getStatusCodeReply() { flush(); - pipelinedCommands--; final byte[] resp = (byte[]) Protocol.read(inputStream); if (null == resp) { return null; @@ -181,13 +177,11 @@ public class Connection { public byte[] getBinaryBulkReply() { flush(); - pipelinedCommands--; return (byte[]) Protocol.read(inputStream); } public Long getIntegerReply() { flush(); - pipelinedCommands--; return (Long) Protocol.read(inputStream); } @@ -198,45 +192,23 @@ public class Connection { @SuppressWarnings("unchecked") public List getBinaryMultiBulkReply() { flush(); - pipelinedCommands--; return (List) Protocol.read(inputStream); } @SuppressWarnings("unchecked") public List getObjectMultiBulkReply() { flush(); - pipelinedCommands--; return (List) Protocol.read(inputStream); } @SuppressWarnings("unchecked") public List getIntegerMultiBulkReply() { flush(); - pipelinedCommands--; return (List) Protocol.read(inputStream); } - public List getAll() { - return getAll(0); - } - - public List getAll(int except) { - List all = new ArrayList(); - flush(); - while (pipelinedCommands > except) { - try{ - all.add(Protocol.read(inputStream)); - }catch(JedisDataException e){ - all.add(e); - } - pipelinedCommands--; - } - return all; - } - public Object getOne() { flush(); - pipelinedCommands--; return Protocol.read(inputStream); } } diff --git a/src/main/java/redis/clients/jedis/Pipeline.java b/src/main/java/redis/clients/jedis/Pipeline.java index 98cab69..786ef8c 100755 --- a/src/main/java/redis/clients/jedis/Pipeline.java +++ b/src/main/java/redis/clients/jedis/Pipeline.java @@ -62,17 +62,24 @@ public class Pipeline extends MultiKeyPipelineBase { protected Client getClient(String key) { return client; } - + + public Object getOneWithJedisDataException() { + try { + return client.getOne(); + } catch (JedisDataException e) { + return e; + } + } + /** * Syncronize pipeline by reading all responses. This operation close the * pipeline. In order to get return values from pipelined commands, capture * the different Response of the commands you execute. */ public void sync() { - List unformatted = client.getAll(); - for (Object o : unformatted) { - generateResponse(o); - } + while (hasPipelinedResponse()) { + generateResponse(getOneWithJedisDataException()); + } } /** @@ -84,12 +91,11 @@ public class Pipeline extends MultiKeyPipelineBase { * @return A list of all the responses in the order you executed them. */ public List syncAndReturnAll() { - List unformatted = client.getAll(); List formatted = new ArrayList(); - for (Object o : unformatted) { - try { - formatted.add(generateResponse(o).get()); + while (hasPipelinedResponse()) { + try { + formatted.add(generateResponse(getOneWithJedisDataException()).get()); } catch (JedisDataException e) { formatted.add(e); } diff --git a/src/main/java/redis/clients/jedis/Queable.java b/src/main/java/redis/clients/jedis/Queable.java index b8ff0e5..d6ae31f 100644 --- a/src/main/java/redis/clients/jedis/Queable.java +++ b/src/main/java/redis/clients/jedis/Queable.java @@ -24,4 +24,11 @@ public class Queable { return lr; } + protected boolean hasPipelinedResponse() { + return pipelinedResponses.size() > 0; + } + + protected int getPipelinedResponseLength() { + return pipelinedResponses.size(); + } } diff --git a/src/main/java/redis/clients/jedis/Transaction.java b/src/main/java/redis/clients/jedis/Transaction.java index 949f541..4d88932 100644 --- a/src/main/java/redis/clients/jedis/Transaction.java +++ b/src/main/java/redis/clients/jedis/Transaction.java @@ -30,9 +30,26 @@ public class Transaction extends MultiKeyPipelineBase { return client; } + public Object getOneWithJedisDataException() { + try { + return client.getOne(); + } catch (JedisDataException e) { + return e; + } + } + + private void consumeResponse(int count) { + for (int i = 0 ; i < count ; i++) + getOneWithJedisDataException(); + } + public List exec() { + // Discard multi + consumeResponse(1); + // Discard QUEUED or ERROR + consumeResponse(getPipelinedResponseLength()); + client.exec(); - client.getAll(1); // Discard all but the last reply List unformatted = client.getObjectMultiBulkReply(); if (unformatted == null) { @@ -50,8 +67,12 @@ public class Transaction extends MultiKeyPipelineBase { } public List> execGetResponse() { + // Discard multi + consumeResponse(1); + // Discard QUEUED or ERROR + consumeResponse(getPipelinedResponseLength()); + client.exec(); - client.getAll(1); // Discard all but the last reply List unformatted = client.getObjectMultiBulkReply(); if (unformatted == null) { @@ -65,8 +86,8 @@ public class Transaction extends MultiKeyPipelineBase { } public String discard() { + consumeResponse(getPipelinedResponseLength()); client.discard(); - client.getAll(1); // Discard all but the last reply inTransaction = false; clean(); return client.getStatusCodeReply(); From 5bf29b43ee520309faf8bc4695f03337e4b88799 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 20 Jan 2014 07:31:13 +0900 Subject: [PATCH 2/6] Let BinaryJedis.multi() consumes "OK" message * Transaction doesn't have to consume "multi"'s response --- src/main/java/redis/clients/jedis/BinaryJedis.java | 2 ++ src/main/java/redis/clients/jedis/Transaction.java | 4 ---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index f08633f..af5a395 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -1679,6 +1679,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKey public Transaction multi() { client.multi(); + client.getOne(); // expected OK return new Transaction(client); } @@ -1687,6 +1688,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKey jedisTransaction.setClient(client); try { client.multi(); + client.getOne(); // expected OK jedisTransaction.execute(); results = jedisTransaction.exec(); } catch (Exception ex) { diff --git a/src/main/java/redis/clients/jedis/Transaction.java b/src/main/java/redis/clients/jedis/Transaction.java index 4d88932..11bb14a 100644 --- a/src/main/java/redis/clients/jedis/Transaction.java +++ b/src/main/java/redis/clients/jedis/Transaction.java @@ -44,8 +44,6 @@ public class Transaction extends MultiKeyPipelineBase { } public List exec() { - // Discard multi - consumeResponse(1); // Discard QUEUED or ERROR consumeResponse(getPipelinedResponseLength()); @@ -67,8 +65,6 @@ public class Transaction extends MultiKeyPipelineBase { } public List> execGetResponse() { - // Discard multi - consumeResponse(1); // Discard QUEUED or ERROR consumeResponse(getPipelinedResponseLength()); From 2267c3318ce6a157e98d43b7c53011868420c09b Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 20 Jan 2014 11:36:24 +0900 Subject: [PATCH 3/6] Implemented Client.getMany(int count) to remove multiple flush while getting responses at once --- .../java/redis/clients/jedis/Connection.java | 13 ++++++++++ .../java/redis/clients/jedis/Pipeline.java | 26 ++++++------------- .../java/redis/clients/jedis/Transaction.java | 19 +++----------- 3 files changed, 24 insertions(+), 34 deletions(-) diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 7c55f9a..bb12079 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -211,4 +211,17 @@ public class Connection { flush(); return Protocol.read(inputStream); } + + public List getMany(int count) { + flush(); + List responses = new ArrayList(); + for (int i = 0 ; i < count ; i++) { + try { + responses.add(Protocol.read(inputStream)); + } catch (JedisDataException e) { + responses.add(e); + } + } + return responses; + } } diff --git a/src/main/java/redis/clients/jedis/Pipeline.java b/src/main/java/redis/clients/jedis/Pipeline.java index 786ef8c..7df2a63 100755 --- a/src/main/java/redis/clients/jedis/Pipeline.java +++ b/src/main/java/redis/clients/jedis/Pipeline.java @@ -63,23 +63,16 @@ public class Pipeline extends MultiKeyPipelineBase { return client; } - public Object getOneWithJedisDataException() { - try { - return client.getOne(); - } catch (JedisDataException e) { - return e; - } - } - /** * Syncronize pipeline by reading all responses. This operation close the * pipeline. In order to get return values from pipelined commands, capture * the different Response of the commands you execute. */ public void sync() { - while (hasPipelinedResponse()) { - generateResponse(getOneWithJedisDataException()); - } + List unformatted = client.getMany(getPipelinedResponseLength()); + + for (Object resp : unformatted) + generateResponse(resp); } /** @@ -91,15 +84,12 @@ public class Pipeline extends MultiKeyPipelineBase { * @return A list of all the responses in the order you executed them. */ public List syncAndReturnAll() { + List unformatted = client.getMany(getPipelinedResponseLength()); List formatted = new ArrayList(); - while (hasPipelinedResponse()) { - try { - formatted.add(generateResponse(getOneWithJedisDataException()).get()); - } catch (JedisDataException e) { - formatted.add(e); - } - } + for (Object resp : unformatted) + formatted.add(generateResponse(resp).get()); + return formatted; } diff --git a/src/main/java/redis/clients/jedis/Transaction.java b/src/main/java/redis/clients/jedis/Transaction.java index 11bb14a..d8cf9de 100644 --- a/src/main/java/redis/clients/jedis/Transaction.java +++ b/src/main/java/redis/clients/jedis/Transaction.java @@ -30,22 +30,9 @@ public class Transaction extends MultiKeyPipelineBase { return client; } - public Object getOneWithJedisDataException() { - try { - return client.getOne(); - } catch (JedisDataException e) { - return e; - } - } - - private void consumeResponse(int count) { - for (int i = 0 ; i < count ; i++) - getOneWithJedisDataException(); - } - public List exec() { // Discard QUEUED or ERROR - consumeResponse(getPipelinedResponseLength()); + client.getMany(getPipelinedResponseLength()); client.exec(); @@ -66,7 +53,7 @@ public class Transaction extends MultiKeyPipelineBase { public List> execGetResponse() { // Discard QUEUED or ERROR - consumeResponse(getPipelinedResponseLength()); + client.getMany(getPipelinedResponseLength()); client.exec(); @@ -82,7 +69,7 @@ public class Transaction extends MultiKeyPipelineBase { } public String discard() { - consumeResponse(getPipelinedResponseLength()); + client.getMany(getPipelinedResponseLength()); client.discard(); inTransaction = false; clean(); From bc7cc5eddba1f49ae513f468fbd22c4e6a6e12a2 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 20 Jan 2014 14:35:42 +0900 Subject: [PATCH 4/6] Shadow client field from PipelineBlock and TransactionBlock * it prevent from user accessing BinaryClient, while it is not treated to pipeline command ** it could make troubles when reading responses --- src/main/java/redis/clients/jedis/PipelineBlock.java | 4 ++++ src/main/java/redis/clients/jedis/Transaction.java | 4 ++++ .../java/redis/clients/jedis/TransactionBlock.java | 10 +++++++++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/PipelineBlock.java b/src/main/java/redis/clients/jedis/PipelineBlock.java index 9afc391..fbf9c7f 100644 --- a/src/main/java/redis/clients/jedis/PipelineBlock.java +++ b/src/main/java/redis/clients/jedis/PipelineBlock.java @@ -2,5 +2,9 @@ package redis.clients.jedis; public abstract class PipelineBlock extends Pipeline { + // For shadowing + @SuppressWarnings("unused") + private Client client; + public abstract void execute(); } diff --git a/src/main/java/redis/clients/jedis/Transaction.java b/src/main/java/redis/clients/jedis/Transaction.java index d8cf9de..8e14fad 100644 --- a/src/main/java/redis/clients/jedis/Transaction.java +++ b/src/main/java/redis/clients/jedis/Transaction.java @@ -76,4 +76,8 @@ public class Transaction extends MultiKeyPipelineBase { return client.getStatusCodeReply(); } + public void setClient(Client client) { + this.client = client; + } + } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/TransactionBlock.java b/src/main/java/redis/clients/jedis/TransactionBlock.java index e784e19..792eae3 100644 --- a/src/main/java/redis/clients/jedis/TransactionBlock.java +++ b/src/main/java/redis/clients/jedis/TransactionBlock.java @@ -3,6 +3,10 @@ package redis.clients.jedis; import redis.clients.jedis.exceptions.JedisException; public abstract class TransactionBlock extends Transaction { + // For shadowing + @SuppressWarnings("unused") + private Client client; + public TransactionBlock(Client client) { super(client); } @@ -13,6 +17,10 @@ public abstract class TransactionBlock extends Transaction { public abstract void execute() throws JedisException; public void setClient(Client client) { - this.client = client; + super.setClient(client); + } + + public String discard() { + return super.discard(); } } From 360e25e07128771b4e11dbc3cca58370bb49c816 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 20 Jan 2014 14:50:02 +0900 Subject: [PATCH 5/6] Remove unused discard() method --- src/main/java/redis/clients/jedis/TransactionBlock.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/redis/clients/jedis/TransactionBlock.java b/src/main/java/redis/clients/jedis/TransactionBlock.java index 792eae3..36c7015 100644 --- a/src/main/java/redis/clients/jedis/TransactionBlock.java +++ b/src/main/java/redis/clients/jedis/TransactionBlock.java @@ -19,8 +19,4 @@ public abstract class TransactionBlock extends Transaction { public void setClient(Client client) { super.setClient(client); } - - public String discard() { - return super.discard(); - } } From 514144d472135dbf5ece2677be2d28f4fa58ca31 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 8 Aug 2014 10:56:39 +0900 Subject: [PATCH 6/6] Fix indentation (code formatting...) --- .../java/redis/clients/jedis/BinaryJedis.java | 39 +++++++-------- .../java/redis/clients/jedis/Connection.java | 50 +++++++++---------- .../java/redis/clients/jedis/Pipeline.java | 10 ++-- .../redis/clients/jedis/PipelineBlock.java | 8 +-- .../java/redis/clients/jedis/Queable.java | 6 +-- .../java/redis/clients/jedis/Transaction.java | 12 ++--- .../redis/clients/jedis/TransactionBlock.java | 10 ++-- 7 files changed, 67 insertions(+), 68 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index f76a391..f2ca337 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -22,11 +22,10 @@ import redis.clients.util.SafeEncoder; public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands, AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable { - protected Client client = null; protected Transaction transaction = null; protected Pipeline pipeline = null; - + public BinaryJedis(final String host) { URI uri = URI.create(host); if (uri.getScheme() != null && uri.getScheme().equals("redis")) { @@ -1771,10 +1770,10 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, public List multi(final TransactionBlock jedisTransaction) { List results = null; jedisTransaction.setClient(client); - client.multi(); - client.getOne(); // expected OK - jedisTransaction.execute(); - results = jedisTransaction.exec(); + client.multi(); + client.getOne(); // expected OK + jedisTransaction.execute(); + results = jedisTransaction.exec(); return results; } @@ -1794,24 +1793,24 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands, } public void resetState() { - if (client.isConnected()) { - if (transaction != null) { - transaction.clear(); - } + if (client.isConnected()) { + if (transaction != null) { + transaction.clear(); + } - if (pipeline != null) { - pipeline.clear(); - } + if (pipeline != null) { + pipeline.clear(); + } - if (client.isInWatch()) { - unwatch(); - } + if (client.isInWatch()) { + unwatch(); + } - client.resetState(); - } + client.resetState(); + } - transaction = null; - pipeline = null; + transaction = null; + pipeline = null; } public String watch(final byte[]... keys) { diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index cee0e3a..c57b170 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -231,42 +231,42 @@ public class Connection implements Closeable { } public Object getOne() { - flush(); - return readProtocolWithCheckingBroken(); + flush(); + return readProtocolWithCheckingBroken(); } public boolean isBroken() { - return broken; + return broken; } protected void flush() { - try { - outputStream.flush(); - } catch (IOException ex) { - broken = true; - throw new JedisConnectionException(ex); - } + try { + outputStream.flush(); + } catch (IOException ex) { + broken = true; + throw new JedisConnectionException(ex); + } } protected Object readProtocolWithCheckingBroken() { - try { - return Protocol.read(inputStream); - } catch (JedisConnectionException exc) { - broken = true; - throw exc; - } + try { + return Protocol.read(inputStream); + } catch (JedisConnectionException exc) { + broken = true; + throw exc; + } } public List getMany(int count) { - flush(); - List responses = new ArrayList(); - for (int i = 0 ; i < count ; i++) { - try { - responses.add(readProtocolWithCheckingBroken()); - } catch (JedisDataException e) { - responses.add(e); - } - } - return responses; + flush(); + List responses = new ArrayList(); + for (int i = 0; i < count; i++) { + try { + responses.add(readProtocolWithCheckingBroken()); + } catch (JedisDataException e) { + responses.add(e); + } + } + return responses; } } diff --git a/src/main/java/redis/clients/jedis/Pipeline.java b/src/main/java/redis/clients/jedis/Pipeline.java index f837894..ba17387 100755 --- a/src/main/java/redis/clients/jedis/Pipeline.java +++ b/src/main/java/redis/clients/jedis/Pipeline.java @@ -70,15 +70,15 @@ public class Pipeline extends MultiKeyPipelineBase { } public void clear() { - if (isInMulti()) { - discard(); - } + if (isInMulti()) { + discard(); + } - sync(); + sync(); } public boolean isInMulti() { - return currentMulti != null; + return currentMulti != null; } /** diff --git a/src/main/java/redis/clients/jedis/PipelineBlock.java b/src/main/java/redis/clients/jedis/PipelineBlock.java index 76d84eb..24bae84 100644 --- a/src/main/java/redis/clients/jedis/PipelineBlock.java +++ b/src/main/java/redis/clients/jedis/PipelineBlock.java @@ -7,9 +7,9 @@ package redis.clients.jedis; * @see https://github.com/xetorthio/jedis/pull/498 */ public abstract class PipelineBlock extends Pipeline { - // For shadowing - @SuppressWarnings("unused") - private Client client; - + // For shadowing + @SuppressWarnings("unused") + private Client client; + public abstract void execute(); } diff --git a/src/main/java/redis/clients/jedis/Queable.java b/src/main/java/redis/clients/jedis/Queable.java index 643abf9..2cd5265 100644 --- a/src/main/java/redis/clients/jedis/Queable.java +++ b/src/main/java/redis/clients/jedis/Queable.java @@ -25,10 +25,10 @@ public class Queable { } protected boolean hasPipelinedResponse() { - return pipelinedResponses.size() > 0; + return pipelinedResponses.size() > 0; } - + protected int getPipelinedResponseLength() { - return pipelinedResponses.size(); + return pipelinedResponses.size(); } } diff --git a/src/main/java/redis/clients/jedis/Transaction.java b/src/main/java/redis/clients/jedis/Transaction.java index 57238d6..e6088ff 100644 --- a/src/main/java/redis/clients/jedis/Transaction.java +++ b/src/main/java/redis/clients/jedis/Transaction.java @@ -32,9 +32,9 @@ public class Transaction extends MultiKeyPipelineBase { } public void clear() { - if (inTransaction) { - discard(); - } + if (inTransaction) { + discard(); + } } public List exec() { @@ -60,7 +60,7 @@ public class Transaction extends MultiKeyPipelineBase { public List> execGetResponse() { // Discard QUEUED or ERROR client.getMany(getPipelinedResponseLength()); - client.exec(); + client.exec(); List unformatted = client.getObjectMultiBulkReply(); if (unformatted == null) { @@ -81,8 +81,8 @@ public class Transaction extends MultiKeyPipelineBase { return client.getStatusCodeReply(); } - public void setClient(Client client) { + public void setClient(Client client) { this.client = client; - } + } } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/TransactionBlock.java b/src/main/java/redis/clients/jedis/TransactionBlock.java index 3dfc8ab..c038ac2 100644 --- a/src/main/java/redis/clients/jedis/TransactionBlock.java +++ b/src/main/java/redis/clients/jedis/TransactionBlock.java @@ -9,10 +9,10 @@ import redis.clients.jedis.exceptions.JedisException; * @see https://github.com/xetorthio/jedis/pull/498 */ public abstract class TransactionBlock extends Transaction { - // For shadowing - @SuppressWarnings("unused") - private Client client; - + // For shadowing + @SuppressWarnings("unused") + private Client client; + public TransactionBlock(Client client) { super(client); } @@ -23,6 +23,6 @@ public abstract class TransactionBlock extends Transaction { public abstract void execute() throws JedisException; public void setClient(Client client) { - super.setClient(client); + super.setClient(client); } }