diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 4136fd3..6848ab3 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -53,6 +53,14 @@ public class Connection { this.host = host; } + protected void flush() { + try { + outputStream.flush(); + } catch (IOException e) { + throw new JedisConnectionException(e); + } + } + protected Connection sendCommand(final Command cmd, final String... args) { final byte[][] bargs = new byte[args.length][]; for (int i = 0; i < args.length; i++) { @@ -135,6 +143,7 @@ public class Connection { } protected String getStatusCodeReply() { + flush(); pipelinedCommands--; final byte[] resp = (byte[]) protocol.read(inputStream); if (null == resp) { @@ -154,11 +163,13 @@ public class Connection { } public byte[] getBinaryBulkReply() { + flush(); pipelinedCommands--; return (byte[]) protocol.read(inputStream); } public Long getIntegerReply() { + flush(); pipelinedCommands--; return (Long) protocol.read(inputStream); } @@ -181,18 +192,21 @@ public class Connection { @SuppressWarnings("unchecked") public List getBinaryMultiBulkReply() { + flush(); pipelinedCommands--; return (List) protocol.read(inputStream); } @SuppressWarnings("unchecked") public List getObjectMultiBulkReply() { + flush(); pipelinedCommands--; return (List) protocol.read(inputStream); } public List getAll() { List all = new ArrayList(); + flush(); while (pipelinedCommands > 0) { all.add(protocol.read(inputStream)); pipelinedCommands--; @@ -201,6 +215,7 @@ public class Connection { } public Object getOne() { + flush(); pipelinedCommands--; return protocol.read(inputStream); } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index de0d889..d92b785 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -44,7 +44,6 @@ public final class Protocol { os.write(arg); os.writeCrLf(); } - os.flush(); } catch (IOException e) { throw new JedisConnectionException(e); } diff --git a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java index 02d3cfb..e0b9fb1 100644 --- a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java +++ b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java @@ -22,11 +22,12 @@ public class ProtocolTest extends JedisTestBase { PipedInputStream pis = new PipedInputStream(); BufferedInputStream bis = new BufferedInputStream(pis); PipedOutputStream pos = new PipedOutputStream(pis); + RedisOutputStream ros = new RedisOutputStream(pos); Protocol protocol = new Protocol(); - protocol.sendCommand(new RedisOutputStream(pos), Protocol.Command.GET, + protocol.sendCommand(ros, Protocol.Command.GET, "SOMEKEY".getBytes(Protocol.CHARSET)); - + ros.flush(); pos.close(); String expectedCommand = "*2\r\n$3\r\nGET\r\n$7\r\nSOMEKEY\r\n";