Merge pull request #145 from TioBorracho/Cleaning

When there is a connection error, disconnect to leave everything clean.
This commit is contained in:
Jonathan Leibiusky
2011-05-15 17:44:12 -07:00
2 changed files with 31 additions and 10 deletions

3
.gitignore vendored
View File

@@ -4,3 +4,6 @@
.gradle/ .gradle/
target/ target/
build/ build/
/appendonly.aof
/dump.rdb
/nohup.out

View File

@@ -61,6 +61,24 @@ public class Connection {
} }
} }
protected Object read() {
try {
return protocol.read(inputStream);
} catch (JedisConnectionException e) {
disconnect();
throw new JedisConnectionException(e);
}
}
protected void sendProtocolCommand(final Command cmd, final byte[]... args) {
try {
protocol.sendCommand(outputStream, cmd, args);
} catch (JedisConnectionException e) {
disconnect();
throw new JedisConnectionException(e);
}
}
protected Connection sendCommand(final Command cmd, final String... args) { protected Connection sendCommand(final Command cmd, final String... args) {
final byte[][] bargs = new byte[args.length][]; final byte[][] bargs = new byte[args.length][];
for (int i = 0; i < args.length; i++) { for (int i = 0; i < args.length; i++) {
@@ -71,14 +89,14 @@ public class Connection {
protected Connection sendCommand(final Command cmd, final byte[]... args) { protected Connection sendCommand(final Command cmd, final byte[]... args) {
connect(); connect();
protocol.sendCommand(outputStream, cmd, args); sendProtocolCommand(cmd, args);
pipelinedCommands++; pipelinedCommands++;
return this; return this;
} }
protected Connection sendCommand(final Command cmd) { protected Connection sendCommand(final Command cmd) {
connect(); connect();
protocol.sendCommand(outputStream, cmd, new byte[0][]); sendProtocolCommand(cmd, new byte[0][]);
pipelinedCommands++; pipelinedCommands++;
return this; return this;
} }
@@ -145,7 +163,7 @@ public class Connection {
protected String getStatusCodeReply() { protected String getStatusCodeReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
final byte[] resp = (byte[]) protocol.read(inputStream); final byte[] resp = (byte[]) read();
if (null == resp) { if (null == resp) {
return null; return null;
} else { } else {
@@ -165,13 +183,13 @@ public class Connection {
public byte[] getBinaryBulkReply() { public byte[] getBinaryBulkReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return (byte[]) protocol.read(inputStream); return (byte[]) read();
} }
public Long getIntegerReply() { public Long getIntegerReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return (Long) protocol.read(inputStream); return (Long) read();
} }
public List<String> getMultiBulkReply() { public List<String> getMultiBulkReply() {
@@ -182,14 +200,14 @@ public class Connection {
public List<byte[]> getBinaryMultiBulkReply() { public List<byte[]> getBinaryMultiBulkReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return (List<byte[]>) protocol.read(inputStream); return (List<byte[]>) read();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Object> getObjectMultiBulkReply() { public List<Object> getObjectMultiBulkReply() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return (List<Object>) protocol.read(inputStream); return (List<Object>) read();
} }
public List<Object> getAll() { public List<Object> getAll() {
@@ -200,7 +218,7 @@ public class Connection {
List<Object> all = new ArrayList<Object>(); List<Object> all = new ArrayList<Object>();
flush(); flush();
while (pipelinedCommands > except) { while (pipelinedCommands > except) {
all.add(protocol.read(inputStream)); all.add(read());
pipelinedCommands--; pipelinedCommands--;
} }
return all; return all;
@@ -209,6 +227,6 @@ public class Connection {
public Object getOne() { public Object getOne() {
flush(); flush();
pipelinedCommands--; pipelinedCommands--;
return protocol.read(inputStream); return read();
} }
} }