Merge branch 'pipeline-and-transaction-can-handle-responses-on-their-own-status' of https://github.com/HeartSaVioR/jedis into HeartSaVioR-pipeline-and-transaction-can-handle-responses-on-their-own-status

This commit is contained in:
Jungtaek Lim
2014-08-13 07:18:48 +09:00
9 changed files with 83 additions and 54 deletions

View File

@@ -963,9 +963,6 @@ public class BinaryClient extends Connection {
} }
public void resetState() { public void resetState() {
if (isInMulti())
discard();
if (isInWatch()) if (isInWatch())
unwatch(); unwatch();
} }

View File

@@ -23,7 +23,9 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
MultiKeyBinaryCommands, AdvancedBinaryJedisCommands, MultiKeyBinaryCommands, AdvancedBinaryJedisCommands,
BinaryScriptingCommands, Closeable { BinaryScriptingCommands, Closeable {
protected Client client = null; protected Client client = null;
protected Transaction transaction = null;
protected Pipeline pipeline = null;
public BinaryJedis(final String host) { public BinaryJedis(final String host) {
URI uri = URI.create(host); URI uri = URI.create(host);
if (uri.getScheme() != null && uri.getScheme().equals("redis")) { if (uri.getScheme() != null && uri.getScheme().equals("redis")) {
@@ -1753,7 +1755,9 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
public Transaction multi() { public Transaction multi() {
client.multi(); client.multi();
return new Transaction(client); client.getOne(); // expected OK
transaction = new Transaction(client);
return transaction;
} }
@Deprecated @Deprecated
@@ -1767,6 +1771,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
List<Object> results = null; List<Object> results = null;
jedisTransaction.setClient(client); jedisTransaction.setClient(client);
client.multi(); client.multi();
client.getOne(); // expected OK
jedisTransaction.execute(); jedisTransaction.execute();
results = jedisTransaction.exec(); results = jedisTransaction.exec();
return results; return results;
@@ -1789,9 +1794,23 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
public void resetState() { public void resetState() {
if (client.isConnected()) { if (client.isConnected()) {
if (transaction != null) {
transaction.clear();
}
if (pipeline != null) {
pipeline.clear();
}
if (client.isInWatch()) {
unwatch();
}
client.resetState(); client.resetState();
client.getAll();
} }
transaction = null;
pipeline = null;
} }
public String watch(final byte[]... keys) { public String watch(final byte[]... keys) {
@@ -2207,7 +2226,7 @@ public class BinaryJedis implements BasicCommands, BinaryJedisCommands,
} }
public Pipeline pipelined() { public Pipeline pipelined() {
Pipeline pipeline = new Pipeline(); pipeline = new Pipeline();
pipeline.setClient(client); pipeline.setClient(client);
return pipeline; return pipeline;
} }

View File

@@ -21,7 +21,6 @@ public class Connection implements Closeable {
private Socket socket; private Socket socket;
private RedisOutputStream outputStream; private RedisOutputStream outputStream;
private RedisInputStream inputStream; private RedisInputStream inputStream;
private int pipelinedCommands = 0;
private int timeout = Protocol.DEFAULT_TIMEOUT; private int timeout = Protocol.DEFAULT_TIMEOUT;
private boolean broken = false; private boolean broken = false;
@@ -78,7 +77,6 @@ public class Connection implements Closeable {
try { try {
connect(); connect();
Protocol.sendCommand(outputStream, cmd, args); Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this; return this;
} catch (JedisConnectionException ex) { } catch (JedisConnectionException ex) {
// Any other exceptions related to connection? // Any other exceptions related to connection?
@@ -91,7 +89,6 @@ public class Connection implements Closeable {
try { try {
connect(); connect();
Protocol.sendCommand(outputStream, cmd, new byte[0][]); Protocol.sendCommand(outputStream, cmd, new byte[0][]);
pipelinedCommands++;
return this; return this;
} catch (JedisConnectionException ex) { } catch (JedisConnectionException ex) {
// Any other exceptions related to connection? // Any other exceptions related to connection?
@@ -180,7 +177,6 @@ public class Connection implements Closeable {
protected String getStatusCodeReply() { protected String getStatusCodeReply() {
flush(); flush();
pipelinedCommands--;
final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
if (null == resp) { if (null == resp) {
return null; return null;
@@ -200,13 +196,11 @@ public class Connection implements Closeable {
public byte[] getBinaryBulkReply() { public byte[] getBinaryBulkReply() {
flush(); flush();
pipelinedCommands--;
return (byte[]) readProtocolWithCheckingBroken(); return (byte[]) readProtocolWithCheckingBroken();
} }
public Long getIntegerReply() { public Long getIntegerReply() {
flush(); flush();
pipelinedCommands--;
return (Long) readProtocolWithCheckingBroken(); return (Long) readProtocolWithCheckingBroken();
} }
@@ -217,14 +211,9 @@ public class Connection implements Closeable {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<byte[]> getBinaryMultiBulkReply() { public List<byte[]> getBinaryMultiBulkReply() {
flush(); flush();
pipelinedCommands--;
return (List<byte[]>) readProtocolWithCheckingBroken(); return (List<byte[]>) readProtocolWithCheckingBroken();
} }
public void resetPipelinedCount() {
pipelinedCommands = 0;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Object> getRawObjectMultiBulkReply() { public List<Object> getRawObjectMultiBulkReply() {
return (List<Object>) readProtocolWithCheckingBroken(); return (List<Object>) readProtocolWithCheckingBroken();
@@ -232,38 +221,17 @@ public class Connection implements Closeable {
public List<Object> getObjectMultiBulkReply() { public List<Object> getObjectMultiBulkReply() {
flush(); flush();
pipelinedCommands--;
return getRawObjectMultiBulkReply(); return getRawObjectMultiBulkReply();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Long> getIntegerMultiBulkReply() { public List<Long> getIntegerMultiBulkReply() {
flush(); flush();
pipelinedCommands--; return (List<Long>) Protocol.read(inputStream);
return (List<Long>) readProtocolWithCheckingBroken();
}
public List<Object> getAll() {
return getAll(0);
}
public List<Object> getAll(int except) {
List<Object> all = new ArrayList<Object>();
flush();
while (pipelinedCommands > except) {
try {
all.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) {
all.add(e);
}
pipelinedCommands--;
}
return all;
} }
public Object getOne() { public Object getOne() {
flush(); flush();
pipelinedCommands--;
return readProtocolWithCheckingBroken(); return readProtocolWithCheckingBroken();
} }
@@ -288,4 +256,17 @@ public class Connection implements Closeable {
throw exc; throw exc;
} }
} }
public List<Object> getMany(int count) {
flush();
List<Object> responses = new ArrayList<Object>();
for (int i = 0; i < count; i++) {
try {
responses.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) {
responses.add(e);
}
}
return responses;
}
} }

View File

@@ -162,12 +162,6 @@ public abstract class JedisPubSub {
/* Invalidate instance since this thread is no longer listening */ /* Invalidate instance since this thread is no longer listening */
this.client = null; this.client = null;
/*
* Reset pipeline count because subscribe() calls would have increased
* it but nothing decremented it.
*/
client.resetPipelinedCount();
} }
public int getSubscribedChannels() { public int getSubscribedChannels() {

View File

@@ -69,13 +69,25 @@ public class Pipeline extends MultiKeyPipelineBase {
return client; return client;
} }
public void clear() {
if (isInMulti()) {
discard();
}
sync();
}
public boolean isInMulti() {
return currentMulti != null;
}
/** /**
* Syncronize pipeline by reading all responses. This operation close the * Syncronize pipeline by reading all responses. This operation close the
* pipeline. In order to get return values from pipelined commands, capture * pipeline. In order to get return values from pipelined commands, capture
* the different Response<?> of the commands you execute. * the different Response<?> of the commands you execute.
*/ */
public void sync() { public void sync() {
List<Object> unformatted = client.getAll(); List<Object> unformatted = client.getMany(getPipelinedResponseLength());
for (Object o : unformatted) { for (Object o : unformatted) {
generateResponse(o); generateResponse(o);
} }
@@ -90,7 +102,7 @@ public class Pipeline extends MultiKeyPipelineBase {
* @return A list of all the responses in the order you executed them. * @return A list of all the responses in the order you executed them.
*/ */
public List<Object> syncAndReturnAll() { public List<Object> syncAndReturnAll() {
List<Object> unformatted = client.getAll(); List<Object> unformatted = client.getMany(getPipelinedResponseLength());
List<Object> formatted = new ArrayList<Object>(); List<Object> formatted = new ArrayList<Object>();
for (Object o : unformatted) { for (Object o : unformatted) {
@@ -106,7 +118,6 @@ public class Pipeline extends MultiKeyPipelineBase {
public Response<String> discard() { public Response<String> discard() {
if (currentMulti == null) if (currentMulti == null)
throw new JedisDataException("DISCARD without MULTI"); throw new JedisDataException("DISCARD without MULTI");
client.discard(); client.discard();
currentMulti = null; currentMulti = null;
return getResponse(BuilderFactory.STRING); return getResponse(BuilderFactory.STRING);

View File

@@ -7,5 +7,9 @@ package redis.clients.jedis;
* @see https://github.com/xetorthio/jedis/pull/498 * @see https://github.com/xetorthio/jedis/pull/498
*/ */
public abstract class PipelineBlock extends Pipeline { public abstract class PipelineBlock extends Pipeline {
// For shadowing
@SuppressWarnings("unused")
private Client client;
public abstract void execute(); public abstract void execute();
} }

View File

@@ -24,4 +24,11 @@ public class Queable {
return lr; return lr;
} }
protected boolean hasPipelinedResponse() {
return pipelinedResponses.size() > 0;
}
protected int getPipelinedResponseLength() {
return pipelinedResponses.size();
}
} }

View File

@@ -31,9 +31,16 @@ public class Transaction extends MultiKeyPipelineBase {
return client; return client;
} }
public void clear() {
if (inTransaction) {
discard();
}
}
public List<Object> exec() { public List<Object> exec() {
// Discard QUEUED or ERROR
client.getMany(getPipelinedResponseLength());
client.exec(); client.exec();
client.getAll(1); // Discard all but the last reply
List<Object> unformatted = client.getObjectMultiBulkReply(); List<Object> unformatted = client.getObjectMultiBulkReply();
if (unformatted == null) { if (unformatted == null) {
@@ -51,8 +58,9 @@ public class Transaction extends MultiKeyPipelineBase {
} }
public List<Response<?>> execGetResponse() { public List<Response<?>> execGetResponse() {
// Discard QUEUED or ERROR
client.getMany(getPipelinedResponseLength());
client.exec(); client.exec();
client.getAll(1); // Discard all but the last reply
List<Object> unformatted = client.getObjectMultiBulkReply(); List<Object> unformatted = client.getObjectMultiBulkReply();
if (unformatted == null) { if (unformatted == null) {
@@ -66,11 +74,15 @@ public class Transaction extends MultiKeyPipelineBase {
} }
public String discard() { public String discard() {
client.getMany(getPipelinedResponseLength());
client.discard(); client.discard();
client.getAll(1); // Discard all but the last reply
inTransaction = false; inTransaction = false;
clean(); clean();
return client.getStatusCodeReply(); return client.getStatusCodeReply();
} }
public void setClient(Client client) {
this.client = client;
}
} }

View File

@@ -9,6 +9,10 @@ import redis.clients.jedis.exceptions.JedisException;
* @see https://github.com/xetorthio/jedis/pull/498 * @see https://github.com/xetorthio/jedis/pull/498
*/ */
public abstract class TransactionBlock extends Transaction { public abstract class TransactionBlock extends Transaction {
// For shadowing
@SuppressWarnings("unused")
private Client client;
public TransactionBlock(Client client) { public TransactionBlock(Client client) {
super(client); super(client);
} }
@@ -19,6 +23,6 @@ public abstract class TransactionBlock extends Transaction {
public abstract void execute() throws JedisException; public abstract void execute() throws JedisException;
public void setClient(Client client) { public void setClient(Client client) {
this.client = client; super.setClient(client);
} }
} }