Flush to socket when starting to read

This commit is contained in:
Pieter Noordhuis
2011-02-25 13:00:34 +01:00
parent dad1b8c394
commit 9dd23cec81
3 changed files with 18 additions and 3 deletions

View File

@@ -53,6 +53,14 @@ public class Connection {
this.host = host; this.host = host;
} }
protected void flush() {
try {
outputStream.flush();
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
protected Connection sendCommand(final Command cmd, final String... args) { protected Connection sendCommand(final Command cmd, final String... args) {
final byte[][] bargs = new byte[args.length][]; final byte[][] bargs = new byte[args.length][];
for (int i = 0; i < args.length; i++) { for (int i = 0; i < args.length; i++) {
@@ -135,6 +143,7 @@ public class Connection {
} }
protected String getStatusCodeReply() { protected String getStatusCodeReply() {
flush();
pipelinedCommands--; pipelinedCommands--;
final byte[] resp = (byte[]) protocol.read(inputStream); final byte[] resp = (byte[]) protocol.read(inputStream);
if (null == resp) { if (null == resp) {
@@ -154,11 +163,13 @@ public class Connection {
} }
public byte[] getBinaryBulkReply() { public byte[] getBinaryBulkReply() {
flush();
pipelinedCommands--; pipelinedCommands--;
return (byte[]) protocol.read(inputStream); return (byte[]) protocol.read(inputStream);
} }
public Long getIntegerReply() { public Long getIntegerReply() {
flush();
pipelinedCommands--; pipelinedCommands--;
return (Long) protocol.read(inputStream); return (Long) protocol.read(inputStream);
} }
@@ -181,18 +192,21 @@ public class Connection {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<byte[]> getBinaryMultiBulkReply() { public List<byte[]> getBinaryMultiBulkReply() {
flush();
pipelinedCommands--; pipelinedCommands--;
return (List<byte[]>) protocol.read(inputStream); return (List<byte[]>) protocol.read(inputStream);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Object> getObjectMultiBulkReply() { public List<Object> getObjectMultiBulkReply() {
flush();
pipelinedCommands--; pipelinedCommands--;
return (List<Object>) protocol.read(inputStream); return (List<Object>) protocol.read(inputStream);
} }
public List<Object> getAll() { public List<Object> getAll() {
List<Object> all = new ArrayList<Object>(); List<Object> all = new ArrayList<Object>();
flush();
while (pipelinedCommands > 0) { while (pipelinedCommands > 0) {
all.add(protocol.read(inputStream)); all.add(protocol.read(inputStream));
pipelinedCommands--; pipelinedCommands--;
@@ -201,6 +215,7 @@ public class Connection {
} }
public Object getOne() { public Object getOne() {
flush();
pipelinedCommands--; pipelinedCommands--;
return protocol.read(inputStream); return protocol.read(inputStream);
} }

View File

@@ -44,7 +44,6 @@ public final class Protocol {
os.write(arg); os.write(arg);
os.writeCrLf(); os.writeCrLf();
} }
os.flush();
} catch (IOException e) { } catch (IOException e) {
throw new JedisConnectionException(e); throw new JedisConnectionException(e);
} }

View File

@@ -22,11 +22,12 @@ public class ProtocolTest extends JedisTestBase {
PipedInputStream pis = new PipedInputStream(); PipedInputStream pis = new PipedInputStream();
BufferedInputStream bis = new BufferedInputStream(pis); BufferedInputStream bis = new BufferedInputStream(pis);
PipedOutputStream pos = new PipedOutputStream(pis); PipedOutputStream pos = new PipedOutputStream(pis);
RedisOutputStream ros = new RedisOutputStream(pos);
Protocol protocol = new Protocol(); Protocol protocol = new Protocol();
protocol.sendCommand(new RedisOutputStream(pos), Protocol.Command.GET, protocol.sendCommand(ros, Protocol.Command.GET,
"SOMEKEY".getBytes(Protocol.CHARSET)); "SOMEKEY".getBytes(Protocol.CHARSET));
ros.flush();
pos.close(); pos.close();
String expectedCommand = "*2\r\n$3\r\nGET\r\n$7\r\nSOMEKEY\r\n"; String expectedCommand = "*2\r\n$3\r\nGET\r\n$7\r\nSOMEKEY\r\n";