Refactor Pipeline / Transaction to consume responses based on their requests, not rely on pipelinedCommands

* remove pipelinedCommands field at Connection class
** it was a risky state value
*** it was under 0 or over 0(though all commands are executed) while some situation
* remove Connection.getAll(), Connection.getAll(int except)
This commit is contained in:
Jungtaek Lim
2014-01-20 00:28:00 +09:00
parent 37f629765e
commit f7bd9c8313
4 changed files with 46 additions and 40 deletions

View File

@@ -21,7 +21,6 @@ public class Connection {
private Socket socket; private Socket socket;
private RedisOutputStream outputStream; private RedisOutputStream outputStream;
private RedisInputStream inputStream; private RedisInputStream inputStream;
private int pipelinedCommands = 0;
private int timeout = Protocol.DEFAULT_TIMEOUT; private int timeout = Protocol.DEFAULT_TIMEOUT;
public Socket getSocket() { public Socket getSocket() {
@@ -81,14 +80,12 @@ public class Connection {
protected Connection sendCommand(final Command cmd, final byte[]... args) { protected Connection sendCommand(final Command cmd, final byte[]... args) {
connect(); connect();
Protocol.sendCommand(outputStream, cmd, args); Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this; return this;
} }
protected Connection sendCommand(final Command cmd) { protected Connection sendCommand(final Command cmd) {
connect(); connect();
Protocol.sendCommand(outputStream, cmd, new byte[0][]); Protocol.sendCommand(outputStream, cmd, new byte[0][]);
pipelinedCommands++;
return this; return this;
} }
@@ -161,7 +158,6 @@ public class Connection {
protected String getStatusCodeReply() { protected String getStatusCodeReply() {
flush(); flush();
pipelinedCommands--;
final byte[] resp = (byte[]) Protocol.read(inputStream); final byte[] resp = (byte[]) Protocol.read(inputStream);
if (null == resp) { if (null == resp) {
return null; return null;
@@ -181,13 +177,11 @@ public class Connection {
public byte[] getBinaryBulkReply() { public byte[] getBinaryBulkReply() {
flush(); flush();
pipelinedCommands--;
return (byte[]) Protocol.read(inputStream); return (byte[]) Protocol.read(inputStream);
} }
public Long getIntegerReply() { public Long getIntegerReply() {
flush(); flush();
pipelinedCommands--;
return (Long) Protocol.read(inputStream); return (Long) Protocol.read(inputStream);
} }
@@ -198,45 +192,23 @@ public class Connection {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<byte[]> getBinaryMultiBulkReply() { public List<byte[]> getBinaryMultiBulkReply() {
flush(); flush();
pipelinedCommands--;
return (List<byte[]>) Protocol.read(inputStream); return (List<byte[]>) Protocol.read(inputStream);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Object> getObjectMultiBulkReply() { public List<Object> getObjectMultiBulkReply() {
flush(); flush();
pipelinedCommands--;
return (List<Object>) Protocol.read(inputStream); return (List<Object>) Protocol.read(inputStream);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Long> getIntegerMultiBulkReply() { public List<Long> getIntegerMultiBulkReply() {
flush(); flush();
pipelinedCommands--;
return (List<Long>) Protocol.read(inputStream); return (List<Long>) Protocol.read(inputStream);
} }
public List<Object> getAll() {
return getAll(0);
}
public List<Object> getAll(int except) {
List<Object> all = new ArrayList<Object>();
flush();
while (pipelinedCommands > except) {
try{
all.add(Protocol.read(inputStream));
}catch(JedisDataException e){
all.add(e);
}
pipelinedCommands--;
}
return all;
}
public Object getOne() { public Object getOne() {
flush(); flush();
pipelinedCommands--;
return Protocol.read(inputStream); return Protocol.read(inputStream);
} }
} }

View File

@@ -62,17 +62,24 @@ public class Pipeline extends MultiKeyPipelineBase {
protected Client getClient(String key) { protected Client getClient(String key) {
return client; return client;
} }
public Object getOneWithJedisDataException() {
try {
return client.getOne();
} catch (JedisDataException e) {
return e;
}
}
/** /**
* Syncronize pipeline by reading all responses. This operation close the * Syncronize pipeline by reading all responses. This operation close the
* pipeline. In order to get return values from pipelined commands, capture * pipeline. In order to get return values from pipelined commands, capture
* the different Response<?> of the commands you execute. * the different Response<?> of the commands you execute.
*/ */
public void sync() { public void sync() {
List<Object> unformatted = client.getAll(); while (hasPipelinedResponse()) {
for (Object o : unformatted) { generateResponse(getOneWithJedisDataException());
generateResponse(o); }
}
} }
/** /**
@@ -84,12 +91,11 @@ public class Pipeline extends MultiKeyPipelineBase {
* @return A list of all the responses in the order you executed them. * @return A list of all the responses in the order you executed them.
*/ */
public List<Object> syncAndReturnAll() { public List<Object> syncAndReturnAll() {
List<Object> unformatted = client.getAll();
List<Object> formatted = new ArrayList<Object>(); List<Object> formatted = new ArrayList<Object>();
for (Object o : unformatted) { while (hasPipelinedResponse()) {
try { try {
formatted.add(generateResponse(o).get()); formatted.add(generateResponse(getOneWithJedisDataException()).get());
} catch (JedisDataException e) { } catch (JedisDataException e) {
formatted.add(e); formatted.add(e);
} }

View File

@@ -24,4 +24,11 @@ public class Queable {
return lr; return lr;
} }
protected boolean hasPipelinedResponse() {
return pipelinedResponses.size() > 0;
}
protected int getPipelinedResponseLength() {
return pipelinedResponses.size();
}
} }

View File

@@ -30,9 +30,26 @@ public class Transaction extends MultiKeyPipelineBase {
return client; return client;
} }
public Object getOneWithJedisDataException() {
try {
return client.getOne();
} catch (JedisDataException e) {
return e;
}
}
private void consumeResponse(int count) {
for (int i = 0 ; i < count ; i++)
getOneWithJedisDataException();
}
public List<Object> exec() { public List<Object> exec() {
// Discard multi
consumeResponse(1);
// Discard QUEUED or ERROR
consumeResponse(getPipelinedResponseLength());
client.exec(); client.exec();
client.getAll(1); // Discard all but the last reply
List<Object> unformatted = client.getObjectMultiBulkReply(); List<Object> unformatted = client.getObjectMultiBulkReply();
if (unformatted == null) { if (unformatted == null) {
@@ -50,8 +67,12 @@ public class Transaction extends MultiKeyPipelineBase {
} }
public List<Response<?>> execGetResponse() { public List<Response<?>> execGetResponse() {
// Discard multi
consumeResponse(1);
// Discard QUEUED or ERROR
consumeResponse(getPipelinedResponseLength());
client.exec(); client.exec();
client.getAll(1); // Discard all but the last reply
List<Object> unformatted = client.getObjectMultiBulkReply(); List<Object> unformatted = client.getObjectMultiBulkReply();
if (unformatted == null) { if (unformatted == null) {
@@ -65,8 +86,8 @@ public class Transaction extends MultiKeyPipelineBase {
} }
public String discard() { public String discard() {
consumeResponse(getPipelinedResponseLength());
client.discard(); client.discard();
client.getAll(1); // Discard all but the last reply
inTransaction = false; inTransaction = false;
clean(); clean();
return client.getStatusCodeReply(); return client.getStatusCodeReply();