From 56795974959e479d3174059c25082de39006604d Mon Sep 17 00:00:00 2001 From: Jonathan Leibiusky Date: Thu, 5 Aug 2010 21:45:21 -0300 Subject: [PATCH] Added pipeline support --- .../java/redis/clients/jedis/Connection.java | 28 +++++++++++--- src/main/java/redis/clients/jedis/Jedis.java | 6 +++ .../redis/clients/jedis/JedisPipeline.java | 11 ++++++ .../java/redis/clients/jedis/Protocol.java | 24 ++---------- .../clients/jedis/tests/PipeliningTest.java | 33 +++++++++++++++++ .../clients/jedis/tests/ProtocolTest.java | 18 ++++----- .../benchmark/PipelinedGetSetBenchmark.java | 37 +++++++++++++++++++ 7 files changed, 121 insertions(+), 36 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/JedisPipeline.java create mode 100644 src/test/java/redis/clients/jedis/tests/PipeliningTest.java create mode 100644 src/test/java/redis/clients/jedis/tests/benchmark/PipelinedGetSetBenchmark.java diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index fa87683..3e433c0 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -6,6 +6,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.List; public class Connection { @@ -16,6 +17,7 @@ public class Connection { private Protocol protocol = new Protocol(); private DataOutputStream outputStream; private DataInputStream inputStream; + private int pipelinedCommands = 0; public Connection(String host) { super(); @@ -27,6 +29,7 @@ public class Connection { throw new JedisException("Please connect Jedis before using it."); } protocol.sendCommand(outputStream, name, args); + pipelinedCommands++; return this; } @@ -58,7 +61,6 @@ public class Connection { public void connect() throws UnknownHostException, IOException { if (!connected) { socket = new Socket(host, port); - socket.setReceiveBufferSize(256); connected = socket.isConnected(); outputStream = new DataOutputStream(socket.getOutputStream()); inputStream = new DataInputStream(new BufferedInputStream(socket @@ -82,24 +84,38 @@ public class Connection { } protected String getStatusCodeReply() { - return protocol.getStatusCodeReply(inputStream); + pipelinedCommands--; + return (String) protocol.read(inputStream); } public String getBulkReply() { - return protocol.getBulkReply(inputStream); + pipelinedCommands--; + return (String) protocol.read(inputStream); } public int getIntegerReply() { - return protocol.getIntegerReply(inputStream); + pipelinedCommands--; + return (Integer) protocol.read(inputStream); } @SuppressWarnings("unchecked") public List getMultiBulkReply() { - return (List) (List) protocol.getMultiBulkReply(inputStream); + pipelinedCommands--; + return (List) protocol.read(inputStream); } + @SuppressWarnings("unchecked") public List getObjectMultiBulkReply() { - return protocol.getMultiBulkReply(inputStream); + pipelinedCommands--; + return (List) protocol.read(inputStream); } + public List getAll() { + List all = new ArrayList(); + while (pipelinedCommands > 0) { + all.add(protocol.read(inputStream)); + pipelinedCommands--; + } + return all; + } } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index a5283b0..8ede713 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -527,4 +527,10 @@ public class Jedis { client.auth(password); return client.getStatusCodeReply(); } + + public List pipelined(JedisPipeline jedisPipeline) { + jedisPipeline.setClient(client); + jedisPipeline.execute(); + return client.getAll(); + } } \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/JedisPipeline.java b/src/main/java/redis/clients/jedis/JedisPipeline.java new file mode 100644 index 0000000..5e78d0b --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisPipeline.java @@ -0,0 +1,11 @@ +package redis.clients.jedis; + +public abstract class JedisPipeline { + protected Client client; + + public void setClient(Client client) { + this.client = client; + } + + public abstract void execute(); +} diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 4c2167b..40d5868 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -65,21 +65,6 @@ public class Protocol { return sb.toString(); } - public String getBulkReply(DataInputStream is) { - Object reply = process(is); - return (String) reply; - } - - public String getStatusCodeReply(DataInputStream is) { - Object reply = process(is); - return (String) reply; - } - - public int getIntegerReply(DataInputStream is) { - Object reply = process(is); - return (Integer) reply; - } - private Object process(DataInputStream is) { try { byte b = is.readByte(); @@ -94,7 +79,7 @@ public class Protocol { } else if (b == PLUS_BYTE) { return processStatusCodeReply(is); } else { - throw new JedisException("Unknown reply"); + throw new JedisException("Unknown reply: " + (char) b); } } catch (IOException e) { throw new JedisException(e); @@ -145,10 +130,7 @@ public class Protocol { return ret; } - @SuppressWarnings("unchecked") - public List getMultiBulkReply(DataInputStream is) { - Object reply = process(is); - List ret = (List) reply; - return ret; + public Object read(DataInputStream is) { + return process(is); } } \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/PipeliningTest.java b/src/test/java/redis/clients/jedis/tests/PipeliningTest.java new file mode 100644 index 0000000..8816555 --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/PipeliningTest.java @@ -0,0 +1,33 @@ +package redis.clients.jedis.tests; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.List; + +import junit.framework.Assert; + +import org.junit.Test; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPipeline; + +public class PipeliningTest extends Assert { + @Test + public void pipeline() throws UnknownHostException, IOException { + Jedis jedis = new Jedis("localhost"); + jedis.connect(); + jedis.auth("foobared"); + jedis.flushAll(); + + List results = jedis.pipelined(new JedisPipeline() { + public void execute() { + client.set("foo", "bar"); + client.get("foo"); + } + }); + + assertEquals(2, results.size()); + assertEquals("OK", results.get(0)); + assertEquals("bar", results.get(1)); + } +} diff --git a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java index adf2a52..8aa48e1 100644 --- a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java +++ b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java @@ -43,7 +43,7 @@ public class ProtocolTest extends Assert { public void bulkReply() { InputStream is = new ByteArrayInputStream("$6\r\nfoobar\r\n".getBytes()); Protocol protocol = new Protocol(); - String response = protocol.getBulkReply(new DataInputStream(is)); + String response = (String) protocol.read(new DataInputStream(is)); assertEquals("foobar", response); } @@ -51,7 +51,7 @@ public class ProtocolTest extends Assert { public void nullBulkReply() { InputStream is = new ByteArrayInputStream("$-1\r\n".getBytes()); Protocol protocol = new Protocol(); - String response = protocol.getBulkReply(new DataInputStream(is)); + String response = (String) protocol.read(new DataInputStream(is)); assertEquals(null, response); } @@ -59,7 +59,7 @@ public class ProtocolTest extends Assert { public void singleLineReply() { InputStream is = new ByteArrayInputStream("+OK\r\n".getBytes()); Protocol protocol = new Protocol(); - String response = protocol.getStatusCodeReply(new DataInputStream(is)); + String response = (String) protocol.read(new DataInputStream(is)); assertEquals("OK", response); } @@ -67,7 +67,7 @@ public class ProtocolTest extends Assert { public void integerReply() { InputStream is = new ByteArrayInputStream(":123\r\n".getBytes()); Protocol protocol = new Protocol(); - int response = protocol.getIntegerReply(new DataInputStream(is)); + int response = (Integer) protocol.read(new DataInputStream(is)); assertEquals(123, response); } @@ -79,7 +79,7 @@ public class ProtocolTest extends Assert { .getBytes()); Protocol protocol = new Protocol(); List response = (List) (List) protocol - .getMultiBulkReply(new DataInputStream(is)); + .read(new DataInputStream(is)); List expected = new ArrayList(); expected.add("foo"); expected.add("bar"); @@ -92,8 +92,8 @@ public class ProtocolTest extends Assert { "*4\r\n$3\r\nfoo\r\n+OK\r\n:1000\r\n*2\r\n$3\r\nfoo\r\n$3\r\nbar" .getBytes()); protocol = new Protocol(); - List response2 = protocol - .getMultiBulkReply(new DataInputStream(is)); + List response2 = (List) protocol + .read(new DataInputStream(is)); List expected2 = new ArrayList(); expected2.add("foo"); expected2.add("OK"); @@ -111,8 +111,8 @@ public class ProtocolTest extends Assert { public void nullMultiBulkReply() { InputStream is = new ByteArrayInputStream("*-1\r\n".getBytes()); Protocol protocol = new Protocol(); - List response = (List) (List) protocol - .getMultiBulkReply(new DataInputStream(is)); + List response = (List) protocol + .read(new DataInputStream(is)); assertNull(response); } } \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/tests/benchmark/PipelinedGetSetBenchmark.java b/src/test/java/redis/clients/jedis/tests/benchmark/PipelinedGetSetBenchmark.java new file mode 100644 index 0000000..af81751 --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/benchmark/PipelinedGetSetBenchmark.java @@ -0,0 +1,37 @@ +package redis.clients.jedis.tests.benchmark; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Calendar; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPipeline; + +public class PipelinedGetSetBenchmark { + private static final int TOTAL_OPERATIONS = 100000; + + public static void main(String[] args) throws UnknownHostException, + IOException { + Jedis jedis = new Jedis("localhost"); + jedis.connect(); + jedis.auth("foobared"); + + long begin = Calendar.getInstance().getTimeInMillis(); + + jedis.pipelined(new JedisPipeline() { + public void execute() { + for (int n = 0; n <= TOTAL_OPERATIONS; n++) { + String key = "foo" + n; + client.set(key, "bar" + n); + client.get(key); + } + } + }); + + long elapsed = Calendar.getInstance().getTimeInMillis() - begin; + + jedis.disconnect(); + + System.out.println(((1000 * 2 * TOTAL_OPERATIONS) / elapsed) + " ops"); + } +} \ No newline at end of file