From de5030e13fac6adedd2926a89788781217e85c3f Mon Sep 17 00:00:00 2001 From: Thomas Sauzedde Date: Fri, 10 Sep 2010 23:05:18 +0800 Subject: [PATCH 01/11] Fix processBulKReply against TCP fragmentation. see http://github.com/xetorthio/jedis/issues#issue/10 --- src/main/java/redis/clients/jedis/Protocol.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index ff96e8b..3c1ff30 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -139,8 +139,11 @@ public class Protocol { return null; } byte[] read = new byte[len]; + int offset = 0; try { - is.read(read); + while(offset < len) { + offset += is.read(read, offset, (len - offset)); + } // read 2 more bytes for the command delimiter is.read(); is.read(); From 97e2a50f60d3b5e5178aa005f9cd9a7dc4f632ff Mon Sep 17 00:00:00 2001 From: Thomas Sauzedde Date: Fri, 10 Sep 2010 23:26:09 +0800 Subject: [PATCH 02/11] Add U test for fragmented processBulkReply patch. --- .../tests/FragmentedByteArrayInputStream.java | 30 +++++++++++++++++++ .../clients/jedis/tests/ProtocolTest.java | 10 +++++++ 2 files changed, 40 insertions(+) create mode 100644 src/test/java/redis/clients/jedis/tests/FragmentedByteArrayInputStream.java diff --git a/src/test/java/redis/clients/jedis/tests/FragmentedByteArrayInputStream.java b/src/test/java/redis/clients/jedis/tests/FragmentedByteArrayInputStream.java new file mode 100644 index 0000000..c57525d --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/FragmentedByteArrayInputStream.java @@ -0,0 +1,30 @@ +package redis.clients.jedis.tests; + +import java.io.ByteArrayInputStream; + +/** + * Test class the fragment a byte array for testing purpose. + */ +public class FragmentedByteArrayInputStream extends ByteArrayInputStream { + private int readMethodCallCount = 0; + public FragmentedByteArrayInputStream(final byte[] buf) { + super(buf); + } + + @Override + public synchronized int read(final byte[] b, final int off, final int len) { + readMethodCallCount++; + if (len <= 10) { + // if the len <= 10, return as usual .. + return super.read(b, off, len); + } else { + // else return the first half .. + return super.read(b, off, len / 2); + } + } + + public int getReadMethodCallCount() { + return readMethodCallCount; + } + +} diff --git a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java index 8aa48e1..361d50e 100644 --- a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java +++ b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java @@ -47,6 +47,16 @@ public class ProtocolTest extends Assert { assertEquals("foobar", response); } + @Test + public void fragmentedBulkReply() { + FragmentedByteArrayInputStream fis = new FragmentedByteArrayInputStream("$30\r\n012345678901234567890123456789\r\n".getBytes()); + Protocol protocol = new Protocol(); + String response = (String) protocol.read(new DataInputStream(fis)); + assertEquals("012345678901234567890123456789", response); + assertEquals(3, fis.getReadMethodCallCount()); + } + + @Test public void nullBulkReply() { InputStream is = new ByteArrayInputStream("$-1\r\n".getBytes()); From 7c680634a3af01b42884c129a89f7e32973bee6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Fri, 10 Sep 2010 18:45:11 +0800 Subject: [PATCH 03/11] autoboxings fixed --- src/main/java/redis/clients/jedis/Connection.java | 2 +- src/main/java/redis/clients/jedis/Jedis.java | 9 ++++----- src/main/java/redis/clients/jedis/JedisPubSub.java | 8 ++++---- src/main/java/redis/clients/jedis/Protocol.java | 4 +--- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 794c398..1316b35 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -125,7 +125,7 @@ public class Connection { public int getIntegerReply() { pipelinedCommands--; - return (Integer) protocol.read(inputStream); + return ((Integer) protocol.read(inputStream)).intValue(); } @SuppressWarnings("unchecked") diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 9a14037..db7c84e 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -467,7 +467,7 @@ public class Jedis { checkIsInMulti(); client.zincrby(key, score, member); String newscore = client.getBulkReply(); - return Double.valueOf(newscore); + return Double.valueOf(newscore).doubleValue(); } public int zrank(String key, String member) { @@ -513,7 +513,7 @@ public class Jedis { checkIsInMulti(); client.zscore(key, member); String score = client.getBulkReply(); - return Double.valueOf(score); + return Double.valueOf(score).doubleValue(); } public Transaction multi() { @@ -684,9 +684,8 @@ public class Jedis { Set set = new LinkedHashSet(); Iterator iterator = membersWithScores.iterator(); while (iterator.hasNext()) { - set - .add(new Tuple(iterator.next(), Double.valueOf(iterator - .next()))); + set.add(new Tuple(iterator.next(), Double.valueOf(iterator.next()) + .doubleValue())); } return set; } diff --git a/src/main/java/redis/clients/jedis/JedisPubSub.java b/src/main/java/redis/clients/jedis/JedisPubSub.java index 7bc9e1d..31d1f5e 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSub.java +++ b/src/main/java/redis/clients/jedis/JedisPubSub.java @@ -51,10 +51,10 @@ public abstract class JedisPubSub { do { List reply = client.getObjectMultiBulkReply(); if (reply.get(0).equals("subscribe")) { - subscribedChannels = (Integer) reply.get(2); + subscribedChannels = ((Integer) reply.get(2)).intValue(); onSubscribe((String) reply.get(1), subscribedChannels); } else if (reply.get(0).equals("unsubscribe")) { - subscribedChannels = (Integer) reply.get(2); + subscribedChannels = ((Integer) reply.get(2)).intValue(); onUnsubscribe((String) reply.get(1), subscribedChannels); } else if (reply.get(0).equals("message")) { onMessage((String) reply.get(1), (String) reply.get(2)); @@ -62,10 +62,10 @@ public abstract class JedisPubSub { onPMessage((String) reply.get(1), (String) reply.get(2), (String) reply.get(3)); } else if (reply.get(0).equals("psubscribe")) { - subscribedChannels = (Integer) reply.get(2); + subscribedChannels = ((Integer) reply.get(2)).intValue(); onPSubscribe((String) reply.get(1), subscribedChannels); } else if (reply.get(0).equals("punsubscribe")) { - subscribedChannels = (Integer) reply.get(2); + subscribedChannels = ((Integer) reply.get(2)).intValue(); onPUnsubscribe((String) reply.get(1), subscribedChannels); } else { throw new JedisException("Unknown message type: " diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 3c1ff30..6cb2023 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -155,10 +155,8 @@ public class Protocol { } private Object processInteger(DataInputStream is) { - int ret = 0; String num = readLine(is); - ret = Integer.parseInt(num); - return ret; + return Integer.valueOf(num); } private Object processMultiBulkReply(DataInputStream is) { From a614f1ff3ea6d634e173ad117c75c135532cd0cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Fri, 10 Sep 2010 18:54:39 +0800 Subject: [PATCH 04/11] return types fixed --- src/main/java/redis/clients/jedis/Protocol.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 6cb2023..8621872 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -127,13 +127,11 @@ public class Protocol { return null; } - private Object processStatusCodeReply(DataInputStream is) { - String ret = null; - ret = readLine(is); - return ret; + private String processStatusCodeReply(DataInputStream is) { + return readLine(is); } - private Object processBulkReply(DataInputStream is) { + private String processBulkReply(DataInputStream is) { int len = Integer.parseInt(readLine(is)); if (len == -1) { return null; @@ -154,12 +152,12 @@ public class Protocol { return new String(read, CHARSET); } - private Object processInteger(DataInputStream is) { + private Integer processInteger(DataInputStream is) { String num = readLine(is); return Integer.valueOf(num); } - private Object processMultiBulkReply(DataInputStream is) { + private List processMultiBulkReply(DataInputStream is) { int num = Integer.parseInt(readLine(is)); if (num == -1) { return null; From c6507e61878479229860d987a1f701e593afb9c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Fri, 10 Sep 2010 19:40:44 +0800 Subject: [PATCH 05/11] connect in sendCommand() and don't check with isConnected() two times --- src/main/java/redis/clients/jedis/Connection.java | 10 +++++++--- src/main/java/redis/clients/jedis/JedisException.java | 4 ++++ src/test/java/redis/clients/jedis/tests/JedisTest.java | 4 ++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 1316b35..a341a69 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -50,9 +50,13 @@ public class Connection { } protected Connection sendCommand(String name, String... args) { - if (!isConnected()) { - throw new JedisException("Please connect Jedis before using it."); - } + try { + connect(); + } catch (UnknownHostException e) { + throw new JedisException("Could not connect to redis-server", e); + } catch (IOException e) { + throw new JedisException("Could not connect to redis-server", e); + } protocol.sendCommand(outputStream, name, args); pipelinedCommands++; return this; diff --git a/src/main/java/redis/clients/jedis/JedisException.java b/src/main/java/redis/clients/jedis/JedisException.java index cfb7394..472ee7a 100644 --- a/src/main/java/redis/clients/jedis/JedisException.java +++ b/src/main/java/redis/clients/jedis/JedisException.java @@ -15,4 +15,8 @@ public class JedisException extends RuntimeException { public JedisException(IOException e) { super(e); } + + public JedisException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/src/test/java/redis/clients/jedis/tests/JedisTest.java b/src/test/java/redis/clients/jedis/tests/JedisTest.java index 8b8bf03..4f28cf7 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisTest.java @@ -11,7 +11,7 @@ import redis.clients.jedis.Protocol; import redis.clients.jedis.tests.commands.JedisCommandTestBase; public class JedisTest extends JedisCommandTestBase { - @Test(expected = JedisException.class) + @Test public void useWithoutConnecting() { Jedis jedis = new Jedis("localhost"); jedis.dbSize(); @@ -31,4 +31,4 @@ public class JedisTest extends JedisCommandTestBase { assertEquals(hash, jedis.hgetAll("foo")); } -} \ No newline at end of file +} From 675e2b1369b9ae215875ed65e24d6a27cc2e0569 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Fri, 10 Sep 2010 19:52:58 +0800 Subject: [PATCH 06/11] RC5 -> RC5.1 --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 450d055..575a9ed 100644 --- a/build.gradle +++ b/build.gradle @@ -3,7 +3,7 @@ apply plugin: 'maven' group = 'redis.clients' archiveBaseName = 'jedis' -version = '1.0.0-RC3' +version = '1.0.0-RC5.1' repositories { mavenCentral() @@ -23,4 +23,4 @@ uploadArchives { } } } -*/ \ No newline at end of file +*/ From d4d133291e2fad2cda68541974d557a96fc5f6c6 Mon Sep 17 00:00:00 2001 From: Alex Tkachman Date: Sat, 11 Sep 2010 09:54:06 +0300 Subject: [PATCH 07/11] isUse must be final otherwise synchronized has no useful semantic --- src/main/java/redis/clients/util/FixedResourcePool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/util/FixedResourcePool.java b/src/main/java/redis/clients/util/FixedResourcePool.java index 6a9ee8f..b902fbd 100644 --- a/src/main/java/redis/clients/util/FixedResourcePool.java +++ b/src/main/java/redis/clients/util/FixedResourcePool.java @@ -195,7 +195,7 @@ public abstract class FixedResourcePool { */ private LinkedBlockingQueue> availableQueue; private LinkedBlockingQueue> repairQueue; - private HashMap> inUse = new HashMap>(); + private final HashMap> inUse = new HashMap>(); private RepairThread[] repairThreads; private Timer t; private boolean initializated = false; From a521841ff51742577074d50722a783415b84d846 Mon Sep 17 00:00:00 2001 From: Alex Tkachman Date: Sat, 11 Sep 2010 21:15:38 +0300 Subject: [PATCH 08/11] a bit faster implementation of output stream for Redis --- .../java/redis/clients/jedis/Connection.java | 14 ++- .../java/redis/clients/jedis/Protocol.java | 67 ++++++++----- .../redis/clients/util/RedisOutputStream.java | 98 +++++++++++++++++++ .../clients/jedis/tests/ProtocolTest.java | 4 +- 4 files changed, 147 insertions(+), 36 deletions(-) create mode 100644 src/main/java/redis/clients/util/RedisOutputStream.java diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index a341a69..5f0c50f 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -1,9 +1,8 @@ package redis.clients.jedis; -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import redis.clients.util.RedisOutputStream; + +import java.io.*; import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; @@ -15,7 +14,7 @@ public class Connection { private int port = Protocol.DEFAULT_PORT; private Socket socket; private Protocol protocol = new Protocol(); - private DataOutputStream outputStream; + private RedisOutputStream outputStream; private DataInputStream inputStream; private int pipelinedCommands = 0; private int timeout = 2000; @@ -91,9 +90,8 @@ public class Connection { if (!isConnected()) { socket = new Socket(host, port); socket.setSoTimeout(timeout); - outputStream = new DataOutputStream(socket.getOutputStream()); - inputStream = new DataInputStream(new BufferedInputStream(socket - .getInputStream())); + outputStream = new RedisOutputStream(socket.getOutputStream()); + inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream())); } } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 8621872..8cf6171 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -1,15 +1,23 @@ package redis.clients.jedis; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import redis.clients.util.RedisOutputStream; + +import java.io.*; import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; import java.util.ArrayList; import java.util.List; public class Protocol { public static final Charset CHARSET = Charset.forName("UTF-8"); + private static final ThreadLocal CHARSET_ENCODER = new ThreadLocal (){ + @Override + protected CharsetEncoder initialValue() { + return CHARSET.newEncoder(); + } + }; + public static final String DOLLAR = "$"; public static final String ASTERISK = "*"; public static final String PLUS = "+"; @@ -26,29 +34,36 @@ public class Protocol { public static final byte MINUS_BYTE = MINUS.getBytes(CHARSET)[0]; public static final byte COLON_BYTE = COLON.getBytes(CHARSET)[0]; - public void sendCommand(DataOutputStream os, String name, String... args) { - StringBuilder sb = new StringBuilder(); - sb.append(ASTERISK); - sb.append((new Integer(args.length + 1)).toString()); - sb.append(COMMAND_DELIMITER); - sb.append(DOLLAR); - sb.append((new Integer(name.length())).toString()); - sb.append(COMMAND_DELIMITER); - sb.append(name); - sb.append(COMMAND_DELIMITER); + public void sendCommand(RedisOutputStream os, String name, String... args) { + try { + final CharsetEncoder encoder = CHARSET_ENCODER.get(); - for (String arg : args) { - int size = arg.getBytes(CHARSET).length; + os.write(ASTERISK_BYTE); + os.write(String.valueOf(args.length + 1), encoder); + os.write(COMMAND_DELIMITER_BYTES); + os.write(DOLLAR_BYTE); + os.write(String.valueOf(name.length()), encoder); + os.write(COMMAND_DELIMITER_BYTES); + os.write(name, encoder); + os.write(COMMAND_DELIMITER_BYTES); - sb.append(DOLLAR); - sb.append((new Integer(size)).toString()); - sb.append(COMMAND_DELIMITER); - sb.append(arg); - sb.append(COMMAND_DELIMITER); - } + for (String arg : args) { + final byte[] bytes = arg.getBytes(CHARSET); + int size = bytes.length; - try { - os.write(sb.toString().getBytes(CHARSET)); + os.write(DOLLAR_BYTE); + os.write(String.valueOf(size), encoder); + os.write(COMMAND_DELIMITER_BYTES); + os.write(bytes); + os.write(COMMAND_DELIMITER_BYTES); + } + os.flush (); + } catch (IOException e) { + throw new JedisException(e); + } + +// try { +// os.write(os.toByteArray()); /* os.write(ASTERISK_BYTE); os.write((new Integer(args.length + 1)).toString() @@ -71,9 +86,9 @@ public class Protocol { os.write(COMMAND_DELIMITER_BYTES); } */ - } catch (IOException e) { - throw new JedisException(e); - } +// } catch (IOException e) { +// throw new JedisException(e); +// } } public void processError(DataInputStream is) { diff --git a/src/main/java/redis/clients/util/RedisOutputStream.java b/src/main/java/redis/clients/util/RedisOutputStream.java new file mode 100644 index 0000000..18108fe --- /dev/null +++ b/src/main/java/redis/clients/util/RedisOutputStream.java @@ -0,0 +1,98 @@ +package redis.clients.util; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; + +/** + * The class implements a buffered output stream without synchronization + * There are also special operations like in-place string encoding + */ +public final class RedisOutputStream extends FilterOutputStream { + protected final byte buf[]; + protected final ByteBuffer outByteBuffer; + + protected int count; + + public RedisOutputStream(OutputStream out) { + this(out, 8192); + } + + public RedisOutputStream(OutputStream out, int size) { + super(out); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + buf = new byte[size]; + outByteBuffer = ByteBuffer.wrap(buf); + } + + private void flushBuffer() throws IOException { + if (count > 0) { + out.write(buf, 0, count); + outByteBuffer.position(0); + count = 0; + } + } + + public void write(int b) throws IOException { + buf[count++] = (byte) b; + if (count >= buf.length) { + flushBuffer(); + } + } + + public void write(byte b[], int off, int len) throws IOException { + if (len >= buf.length) { + flushBuffer(); + out.write(b, off, len); + } + else { + if (len >= buf.length - count) { + flushBuffer(); + } + + System.arraycopy(b, off, buf, count, len); + count += len; + } + } + + public void write(String str, CharsetEncoder encoder) throws IOException { + final CharBuffer in = CharBuffer.wrap(str); + if (in.remaining() == 0) + return; + + outByteBuffer.position(count); + + encoder.reset(); + for (;;) { + CoderResult cr; + if (in.hasRemaining()) + cr = encoder.encode(in, outByteBuffer, true); + else + cr = encoder.flush(outByteBuffer); + + count = outByteBuffer.position(); + if(count == buf.length) + flushBuffer(); + + if (cr.isUnderflow()) { + break; + } + if (cr.isOverflow()) { + flushBuffer(); + continue; + } + cr.throwException(); + } + } + + public void flush() throws IOException { + flushBuffer(); + out.flush(); + } +} diff --git a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java index 361d50e..ad161fe 100644 --- a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java +++ b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java @@ -3,7 +3,6 @@ package redis.clients.jedis.tests; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PipedInputStream; @@ -16,6 +15,7 @@ import junit.framework.Assert; import org.junit.Test; import redis.clients.jedis.Protocol; +import redis.clients.util.RedisOutputStream; public class ProtocolTest extends Assert { @Test @@ -25,7 +25,7 @@ public class ProtocolTest extends Assert { PipedOutputStream pos = new PipedOutputStream(pis); Protocol protocol = new Protocol(); - protocol.sendCommand(new DataOutputStream(pos), "GET", "SOMEKEY"); + protocol.sendCommand(new RedisOutputStream(pos), "GET", "SOMEKEY"); pos.close(); String expectedCommand = "*2\r\n$3\r\nGET\r\n$7\r\nSOMEKEY\r\n"; From b573526a0dc68d70846ccf7a3b031233e2f2bfcd Mon Sep 17 00:00:00 2001 From: Alex Tkachman Date: Sat, 11 Sep 2010 21:48:36 +0300 Subject: [PATCH 09/11] optimized writeInt --- .../java/redis/clients/jedis/Protocol.java | 48 ++--------- .../redis/clients/util/RedisOutputStream.java | 79 ++++++++++++++++++- 2 files changed, 83 insertions(+), 44 deletions(-) diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 8cf6171..0b749be 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -11,12 +11,7 @@ import java.util.List; public class Protocol { public static final Charset CHARSET = Charset.forName("UTF-8"); - private static final ThreadLocal CHARSET_ENCODER = new ThreadLocal (){ - @Override - protected CharsetEncoder initialValue() { - return CHARSET.newEncoder(); - } - }; + private final CharsetEncoder CHARSET_ENCODER = CHARSET.newEncoder(); public static final String DOLLAR = "$"; public static final String ASTERISK = "*"; @@ -24,8 +19,7 @@ public class Protocol { public static final String MINUS = "-"; public static final String COLON = ":"; public static final String COMMAND_DELIMITER = "\r\n"; - public static final byte[] COMMAND_DELIMITER_BYTES = "\r\n" - .getBytes(CHARSET); + public static final byte[] COMMAND_DELIMITER_BYTES = "\r\n".getBytes(CHARSET); public static final int DEFAULT_PORT = 6379; public static final byte DOLLAR_BYTE = DOLLAR.getBytes(CHARSET)[0]; @@ -36,15 +30,13 @@ public class Protocol { public void sendCommand(RedisOutputStream os, String name, String... args) { try { - final CharsetEncoder encoder = CHARSET_ENCODER.get(); - os.write(ASTERISK_BYTE); - os.write(String.valueOf(args.length + 1), encoder); + os.writeInt(args.length + 1); os.write(COMMAND_DELIMITER_BYTES); os.write(DOLLAR_BYTE); - os.write(String.valueOf(name.length()), encoder); + os.writeInt(name.length()); os.write(COMMAND_DELIMITER_BYTES); - os.write(name, encoder); + os.writeString(name, CHARSET_ENCODER); os.write(COMMAND_DELIMITER_BYTES); for (String arg : args) { @@ -52,7 +44,7 @@ public class Protocol { int size = bytes.length; os.write(DOLLAR_BYTE); - os.write(String.valueOf(size), encoder); + os.writeInt(size); os.write(COMMAND_DELIMITER_BYTES); os.write(bytes); os.write(COMMAND_DELIMITER_BYTES); @@ -61,34 +53,6 @@ public class Protocol { } catch (IOException e) { throw new JedisException(e); } - -// try { -// os.write(os.toByteArray()); - /* - os.write(ASTERISK_BYTE); - os.write((new Integer(args.length + 1)).toString() - .getBytes(CHARSET)); - os.write(COMMAND_DELIMITER_BYTES); - os.write(DOLLAR_BYTE); - os.write((new Integer(name.length())).toString().getBytes(CHARSET)); - os.write(COMMAND_DELIMITER_BYTES); - os.write(name.getBytes(CHARSET)); - os.write(COMMAND_DELIMITER_BYTES); - - for (String arg : args) { - byte[] barg = arg.getBytes(CHARSET); - - os.write(DOLLAR_BYTE); - os.write((new Integer(barg.length)).toString() - .getBytes(CHARSET)); - os.write(COMMAND_DELIMITER_BYTES); - os.write(barg); - os.write(COMMAND_DELIMITER_BYTES); - } - */ -// } catch (IOException e) { -// throw new JedisException(e); -// } } public void processError(DataInputStream is) { diff --git a/src/main/java/redis/clients/util/RedisOutputStream.java b/src/main/java/redis/clients/util/RedisOutputStream.java index 18108fe..85d4324 100644 --- a/src/main/java/redis/clients/util/RedisOutputStream.java +++ b/src/main/java/redis/clients/util/RedisOutputStream.java @@ -61,7 +61,7 @@ public final class RedisOutputStream extends FilterOutputStream { } } - public void write(String str, CharsetEncoder encoder) throws IOException { + public void writeString(String str, CharsetEncoder encoder) throws IOException { final CharBuffer in = CharBuffer.wrap(str); if (in.remaining() == 0) return; @@ -91,8 +91,83 @@ public final class RedisOutputStream extends FilterOutputStream { } } + private final static int [] sizeTable = { 9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, 999999999, Integer.MAX_VALUE }; + + private final static byte [] DigitTens = { + '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', + '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', + '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', + '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', + '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', + '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', + '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', + '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', + '8', '8', '8', '8', '8', '8', '8', '8', '8', '8', + '9', '9', '9', '9', '9', '9', '9', '9', '9', '9', + } ; + + private final static byte [] DigitOnes = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + } ; + + private final static byte[] digits = { + '0' , '1' , '2' , '3' , '4' , '5' , + '6' , '7' , '8' , '9' , 'a' , 'b' , + 'c' , 'd' , 'e' , 'f' , 'g' , 'h' , + 'i' , 'j' , 'k' , 'l' , 'm' , 'n' , + 'o' , 'p' , 'q' , 'r' , 's' , 't' , + 'u' , 'v' , 'w' , 'x' , 'y' , 'z' + }; + + public void writeInt(int value) throws IOException { + if(value < 0) { + write('-'); + value = -value; + } + + int size = 0; + while (value > sizeTable[size]) + size++; + + size++; + if (size >= buf.length - count) { + flushBuffer(); + } + + int q, r; + int charPos = count + size; + char sign = 0; + + // Generate two digits per iteration + while ( value >= 65536) { + q = value / 100; + r = value - ((q << 6) + (q << 5) + (q << 2)); + value = q; + buf [--charPos] = DigitOnes[r]; + buf [--charPos] = DigitTens[r]; + } + + for (;;) { + q = (value * 52429) >>> (16+3); + r = value - ((q << 3) + (q << 1)); // r = i-(q*10) ... + buf [--charPos] = digits [r]; + value = q; + if (value == 0) break; + } + count += size; + } + public void flush() throws IOException { flushBuffer(); out.flush(); } -} +} \ No newline at end of file From aed824c94cf9bc23da96b15b1d65bc6f1a5eff15 Mon Sep 17 00:00:00 2001 From: Alex Tkachman Date: Sun, 12 Sep 2010 12:36:16 +0200 Subject: [PATCH 10/11] more optimizations on write side --- .../java/redis/clients/jedis/Protocol.java | 51 ++--- .../redis/clients/util/RedisOutputStream.java | 209 ++++++++++++------ .../redis/clients/jedis/tests/JedisTest.java | 5 +- 3 files changed, 157 insertions(+), 108 deletions(-) diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 0b749be..a56bcf0 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -3,51 +3,38 @@ package redis.clients.jedis; import redis.clients.util.RedisOutputStream; import java.io.*; -import java.nio.charset.Charset; -import java.nio.charset.CharsetEncoder; import java.util.ArrayList; import java.util.List; +import static redis.clients.util.RedisOutputStream.CHARSET; + public class Protocol { - public static final Charset CHARSET = Charset.forName("UTF-8"); - private final CharsetEncoder CHARSET_ENCODER = CHARSET.newEncoder(); - - public static final String DOLLAR = "$"; - public static final String ASTERISK = "*"; - public static final String PLUS = "+"; - public static final String MINUS = "-"; - public static final String COLON = ":"; - public static final String COMMAND_DELIMITER = "\r\n"; - public static final byte[] COMMAND_DELIMITER_BYTES = "\r\n".getBytes(CHARSET); public static final int DEFAULT_PORT = 6379; - public static final byte DOLLAR_BYTE = DOLLAR.getBytes(CHARSET)[0]; - public static final byte ASTERISK_BYTE = ASTERISK.getBytes(CHARSET)[0]; - public static final byte PLUS_BYTE = PLUS.getBytes(CHARSET)[0]; - public static final byte MINUS_BYTE = MINUS.getBytes(CHARSET)[0]; - public static final byte COLON_BYTE = COLON.getBytes(CHARSET)[0]; + public static final byte DOLLAR_BYTE = '$'; + public static final byte ASTERISK_BYTE = '*'; + public static final byte PLUS_BYTE = '+'; + public static final byte MINUS_BYTE = '-'; + public static final byte COLON_BYTE = ':'; public void sendCommand(RedisOutputStream os, String name, String... args) { try { os.write(ASTERISK_BYTE); - os.writeInt(args.length + 1); - os.write(COMMAND_DELIMITER_BYTES); + os.writeIntCrLf(args.length + 1); os.write(DOLLAR_BYTE); - os.writeInt(name.length()); - os.write(COMMAND_DELIMITER_BYTES); - os.writeString(name, CHARSET_ENCODER); - os.write(COMMAND_DELIMITER_BYTES); + os.writeIntCrLf(name.length()); + os.writeAsciiCrLf(name); - for (String arg : args) { - final byte[] bytes = arg.getBytes(CHARSET); - int size = bytes.length; - - os.write(DOLLAR_BYTE); - os.writeInt(size); - os.write(COMMAND_DELIMITER_BYTES); - os.write(bytes); - os.write(COMMAND_DELIMITER_BYTES); + for (String str : args) { + os.write(DOLLAR_BYTE); + final int size = RedisOutputStream.utf8Length(str); + os.writeIntCrLf(size); + if(size == str.length()) + os.writeAsciiCrLf(str); + else { + os.writeUtf8CrLf(str); + } } os.flush (); } catch (IOException e) { diff --git a/src/main/java/redis/clients/util/RedisOutputStream.java b/src/main/java/redis/clients/util/RedisOutputStream.java index 85d4324..714ba7b 100644 --- a/src/main/java/redis/clients/util/RedisOutputStream.java +++ b/src/main/java/redis/clients/util/RedisOutputStream.java @@ -1,10 +1,9 @@ package redis.clients.util; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; @@ -16,7 +15,10 @@ public final class RedisOutputStream extends FilterOutputStream { protected final byte buf[]; protected final ByteBuffer outByteBuffer; + private final CharsetEncoder CHARSET_ENCODER = CHARSET.newEncoder(); + protected int count; + public static final Charset CHARSET = Charset.forName("UTF-8"); public RedisOutputStream(OutputStream out) { this(out, 8192); @@ -41,7 +43,7 @@ public final class RedisOutputStream extends FilterOutputStream { public void write(int b) throws IOException { buf[count++] = (byte) b; - if (count >= buf.length) { + if (count == buf.length) { flushBuffer(); } } @@ -50,8 +52,7 @@ public final class RedisOutputStream extends FilterOutputStream { if (len >= buf.length) { flushBuffer(); out.write(b, off, len); - } - else { + } else { if (len >= buf.length - count) { flushBuffer(); } @@ -61,75 +62,137 @@ public final class RedisOutputStream extends FilterOutputStream { } } - public void writeString(String str, CharsetEncoder encoder) throws IOException { - final CharBuffer in = CharBuffer.wrap(str); - if (in.remaining() == 0) - return; + public void writeAsciiCrLf(String in) throws IOException { + final int size = in.length(); - outByteBuffer.position(count); - - encoder.reset(); - for (;;) { - CoderResult cr; - if (in.hasRemaining()) - cr = encoder.encode(in, outByteBuffer, true); - else - cr = encoder.flush(outByteBuffer); - - count = outByteBuffer.position(); - if(count == buf.length) + for (int i = 0; i != size; ++i) { + buf[count++] = (byte) in.charAt(i); + if (count == buf.length) { flushBuffer(); - - if (cr.isUnderflow()) { - break; } - if (cr.isOverflow()) { - flushBuffer(); - continue; - } - cr.throwException(); } + + writeCrLf(); } - private final static int [] sizeTable = { 9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, 999999999, Integer.MAX_VALUE }; + public static boolean isSurrogate(char ch) { + return ch >= Character.MIN_SURROGATE && ch <= Character.MAX_SURROGATE; + } - private final static byte [] DigitTens = { - '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', - '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', - '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', - '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', - '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', - '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', - '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', - '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', - '8', '8', '8', '8', '8', '8', '8', '8', '8', '8', - '9', '9', '9', '9', '9', '9', '9', '9', '9', '9', - } ; + public static int utf8Length (String str) { + int strLen = str.length(), utfLen = 0; + for(int i = 0; i != strLen; ++i) { + char c = str.charAt(i); + if (c < 0x80) { + utfLen++; + } else if (c < 0x800) { + utfLen += 2; + } else if (isSurrogate(c)) { + i++; + utfLen += 4; + } else { + utfLen += 3; + } + } + return utfLen; + } - private final static byte [] DigitOnes = { - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', - } ; + public void writeCrLf() throws IOException { + if (2 >= buf.length - count) { + flushBuffer(); + } - private final static byte[] digits = { - '0' , '1' , '2' , '3' , '4' , '5' , - '6' , '7' , '8' , '9' , 'a' , 'b' , - 'c' , 'd' , 'e' , 'f' , 'g' , 'h' , - 'i' , 'j' , 'k' , 'l' , 'm' , 'n' , - 'o' , 'p' , 'q' , 'r' , 's' , 't' , - 'u' , 'v' , 'w' , 'x' , 'y' , 'z' + buf[count++] = '\r'; + buf[count++] = '\n'; + } + + public void writeUtf8CrLf(String str) throws IOException { + int strLen = str.length(); + + int i; + for (i = 0; i < strLen; i++) { + char c = str.charAt(i); + if (!(c < 0x80)) break; + buf[count++] = (byte) c; + if(count == buf.length) { + flushBuffer(); + } + } + + for (; i < strLen; i++) { + char c = str.charAt(i); + if (c < 0x80) { + buf[count++] = (byte) c; + if(count == buf.length) { + flushBuffer(); + } + } else if (c < 0x800) { + if(2 < buf.length - count) { + flushBuffer(); + } + buf[count++] = (byte)(0xc0 | (c >> 6)); + buf[count++] = (byte)(0x80 | (c & 0x3f)); + } else if (isSurrogate(c)) { + if(4 < buf.length - count) { + flushBuffer(); + } + int uc = Character.toCodePoint(c, str.charAt(i++)); + buf[count++] = ((byte)(0xf0 | ((uc >> 18)))); + buf[count++] = ((byte)(0x80 | ((uc >> 12) & 0x3f))); + buf[count++] = ((byte)(0x80 | ((uc >> 6) & 0x3f))); + buf[count++] = ((byte)(0x80 | (uc & 0x3f))); + } else { + if(3 < buf.length - count) { + flushBuffer(); + } + buf[count++] =((byte)(0xe0 | ((c >> 12)))); + buf[count++] =((byte)(0x80 | ((c >> 6) & 0x3f))); + buf[count++] =((byte)(0x80 | (c & 0x3f))); + } + } + + writeCrLf(); + } + + private final static int[] sizeTable = {9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, 999999999, Integer.MAX_VALUE}; + + private final static byte[] DigitTens = { + '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', + '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', + '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', + '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', + '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', + '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', + '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', + '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', + '8', '8', '8', '8', '8', '8', '8', '8', '8', '8', + '9', '9', '9', '9', '9', '9', '9', '9', '9', '9', }; - public void writeInt(int value) throws IOException { - if(value < 0) { + private final static byte[] DigitOnes = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + }; + + private final static byte[] digits = { + '0', '1', '2', '3', '4', '5', + '6', '7', '8', '9', 'a', 'b', + 'c', 'd', 'e', 'f', 'g', 'h', + 'i', 'j', 'k', 'l', 'm', 'n', + 'o', 'p', 'q', 'r', 's', 't', + 'u', 'v', 'w', 'x', 'y', 'z' + }; + + public void writeIntCrLf(int value) throws IOException { + if (value < 0) { write('-'); value = -value; } @@ -145,25 +208,25 @@ public final class RedisOutputStream extends FilterOutputStream { int q, r; int charPos = count + size; - char sign = 0; - // Generate two digits per iteration - while ( value >= 65536) { + while (value >= 65536) { q = value / 100; r = value - ((q << 6) + (q << 5) + (q << 2)); value = q; - buf [--charPos] = DigitOnes[r]; - buf [--charPos] = DigitTens[r]; + buf[--charPos] = DigitOnes[r]; + buf[--charPos] = DigitTens[r]; } - for (;;) { - q = (value * 52429) >>> (16+3); - r = value - ((q << 3) + (q << 1)); // r = i-(q*10) ... - buf [--charPos] = digits [r]; + for (; ;) { + q = (value * 52429) >>> (16 + 3); + r = value - ((q << 3) + (q << 1)); + buf[--charPos] = digits[r]; value = q; if (value == 0) break; } count += size; + + writeCrLf(); } public void flush() throws IOException { diff --git a/src/test/java/redis/clients/jedis/tests/JedisTest.java b/src/test/java/redis/clients/jedis/tests/JedisTest.java index 4f28cf7..2af5e42 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisTest.java @@ -6,9 +6,8 @@ import java.util.Map; import org.junit.Test; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisException; -import redis.clients.jedis.Protocol; import redis.clients.jedis.tests.commands.JedisCommandTestBase; +import redis.clients.util.RedisOutputStream; public class JedisTest extends JedisCommandTestBase { @Test @@ -24,7 +23,7 @@ public class JedisTest extends JedisCommandTestBase { bigdata[b] = (byte) ((byte) b % 255); } Map hash = new HashMap(); - hash.put("data", new String(bigdata, Protocol.CHARSET)); + hash.put("data", new String(bigdata, RedisOutputStream.CHARSET)); String status = jedis.hmset("foo", hash); assertEquals("OK", status); From f9e7887e02b86ce893fde73dfec1ec5daabb7ba3 Mon Sep 17 00:00:00 2001 From: Alex Tkachman Date: Sun, 12 Sep 2010 17:05:58 +0200 Subject: [PATCH 11/11] more optimizations on write side --- .../java/redis/clients/jedis/Connection.java | 7 +- .../java/redis/clients/jedis/Protocol.java | 149 ++++++++---------- .../redis/clients/util/RedisInputStream.java | 106 +++++++++++++ .../redis/clients/util/RedisOutputStream.java | 14 +- .../clients/jedis/tests/ProtocolTest.java | 19 +-- 5 files changed, 186 insertions(+), 109 deletions(-) create mode 100644 src/main/java/redis/clients/util/RedisInputStream.java diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 5f0c50f..c0191ec 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import redis.clients.util.RedisInputStream; import redis.clients.util.RedisOutputStream; import java.io.*; @@ -15,7 +16,7 @@ public class Connection { private Socket socket; private Protocol protocol = new Protocol(); private RedisOutputStream outputStream; - private DataInputStream inputStream; + private RedisInputStream inputStream; private int pipelinedCommands = 0; private int timeout = 2000; @@ -91,7 +92,7 @@ public class Connection { socket = new Socket(host, port); socket.setSoTimeout(timeout); outputStream = new RedisOutputStream(socket.getOutputStream()); - inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream())); + inputStream = new RedisInputStream(socket.getInputStream()); } } @@ -150,4 +151,4 @@ public class Connection { } return all; } -} \ No newline at end of file +} diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index a56bcf0..e9e6211 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import redis.clients.util.RedisInputStream; import redis.clients.util.RedisOutputStream; import java.io.*; @@ -8,7 +9,7 @@ import java.util.List; import static redis.clients.util.RedisOutputStream.CHARSET; -public class Protocol { +public final class Protocol { public static final int DEFAULT_PORT = 6379; @@ -30,112 +31,88 @@ public class Protocol { os.write(DOLLAR_BYTE); final int size = RedisOutputStream.utf8Length(str); os.writeIntCrLf(size); - if(size == str.length()) + if (size == str.length()) os.writeAsciiCrLf(str); else { os.writeUtf8CrLf(str); } } - os.flush (); + os.flush(); } catch (IOException e) { throw new JedisException(e); } } - public void processError(DataInputStream is) { - String message = readLine(is); - throw new JedisException(message); + private void processError(RedisInputStream is) { + String message = is.readLine(); + throw new JedisException(message); } - private String readLine(DataInputStream is) { - byte b; - byte c; - StringBuilder sb = new StringBuilder(); - - try { - while ((b = is.readByte()) != -1) { - if (b == '\r') { - c = is.readByte(); - if (c == '\n') { - break; - } - sb.append((char) b); - sb.append((char) c); - } else { - sb.append((char) b); - } - } - } catch (IOException e) { - throw new JedisException(e); - } - return sb.toString(); + private Object process(RedisInputStream is) { + try { + byte b = is.readByte(); + if (b == MINUS_BYTE) { + processError(is); + } else if (b == ASTERISK_BYTE) { + return processMultiBulkReply(is); + } else if (b == COLON_BYTE) { + return processInteger(is); + } else if (b == DOLLAR_BYTE) { + return processBulkReply(is); + } else if (b == PLUS_BYTE) { + return processStatusCodeReply(is); + } else { + throw new JedisException("Unknown reply: " + (char) b); + } + } catch (IOException e) { + throw new JedisException(e); + } + return null; } - private Object process(DataInputStream is) { - try { - byte b = is.readByte(); - if (b == MINUS_BYTE) { - processError(is); - } else if (b == ASTERISK_BYTE) { - return processMultiBulkReply(is); - } else if (b == COLON_BYTE) { - return processInteger(is); - } else if (b == DOLLAR_BYTE) { - return processBulkReply(is); - } else if (b == PLUS_BYTE) { - return processStatusCodeReply(is); - } else { - throw new JedisException("Unknown reply: " + (char) b); - } - } catch (IOException e) { - throw new JedisException(e); - } - return null; + private String processStatusCodeReply(RedisInputStream is) { + return is.readLine(); } - private String processStatusCodeReply(DataInputStream is) { - return readLine(is); + private String processBulkReply(RedisInputStream is) { + int len = Integer.parseInt(is.readLine()); + if (len == -1) { + return null; + } + byte[] read = new byte[len]; + int offset = 0; + try { + while (offset < len) { + offset += is.read(read, offset, (len - offset)); + } + // read 2 more bytes for the command delimiter + is.readByte(); + is.readByte(); + } catch (IOException e) { + throw new JedisException(e); + } + + return new String(read, CHARSET); } - private String processBulkReply(DataInputStream is) { - int len = Integer.parseInt(readLine(is)); - if (len == -1) { - return null; - } - byte[] read = new byte[len]; - int offset = 0; - try { - while(offset < len) { - offset += is.read(read, offset, (len - offset)); - } - // read 2 more bytes for the command delimiter - is.read(); - is.read(); - } catch (IOException e) { - throw new JedisException(e); - } - - return new String(read, CHARSET); + private Integer processInteger(RedisInputStream is) { + String num = is.readLine(); + return Integer.valueOf(num); } - private Integer processInteger(DataInputStream is) { - String num = readLine(is); - return Integer.valueOf(num); + private List processMultiBulkReply(RedisInputStream is) { + int num = Integer.parseInt(is.readLine()); + if (num == -1) { + return null; + } + List ret = new ArrayList(num); + for (int i = 0; i < num; i++) { + ret.add(process(is)); + } + return ret; } - private List processMultiBulkReply(DataInputStream is) { - int num = Integer.parseInt(readLine(is)); - if (num == -1) { - return null; - } - List ret = new ArrayList(); - for (int i = 0; i < num; i++) { - ret.add(process(is)); - } - return ret; - } - - public Object read(DataInputStream is) { - return process(is); + public Object read(RedisInputStream is) { + return process(is); } } \ No newline at end of file diff --git a/src/main/java/redis/clients/util/RedisInputStream.java b/src/main/java/redis/clients/util/RedisInputStream.java new file mode 100644 index 0000000..135bdcf --- /dev/null +++ b/src/main/java/redis/clients/util/RedisInputStream.java @@ -0,0 +1,106 @@ +/* + * Copyright 2009-2010 MBTE Sweden AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package redis.clients.util; + +import redis.clients.jedis.JedisException; + +import java.io.*; + +public class RedisInputStream extends FilterInputStream { + + protected final byte buf[]; + + protected int count, limit; + + public RedisInputStream(InputStream in, int size) { + super(in); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + buf = new byte[size]; + } + + public RedisInputStream(InputStream in) { + this(in, 8192); + } + + public byte readByte () throws IOException { + if(count == limit) { + fill (); + } + + return buf[count++]; + } + + public String readLine() { + int b; + byte c; + StringBuilder sb = new StringBuilder(); + + try { + while (true) { + if(count == limit) { + fill (); + } + if(limit == -1) + break; + + b = buf[count++]; + if (b == '\r') { + if(count == limit) { + fill (); + } + + if(limit == -1) { + sb.append((char) b); + break; + } + + c = buf[count++]; + if (c == '\n') { + break; + } + sb.append((char) b); + sb.append((char) c); + } else { + sb.append((char) b); + } + } + } catch (IOException e) { + throw new JedisException(e); + } + return sb.toString(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if(count == limit) { + fill (); + if(limit == -1) + return -1; + } + final int length = Math.min(limit - count, len); + System.arraycopy(buf, count, b, off, length); + count += length; + return length; + } + + private void fill () throws IOException { + limit = in.read(buf); + count = 0; + } +} diff --git a/src/main/java/redis/clients/util/RedisOutputStream.java b/src/main/java/redis/clients/util/RedisOutputStream.java index 714ba7b..d9233e9 100644 --- a/src/main/java/redis/clients/util/RedisOutputStream.java +++ b/src/main/java/redis/clients/util/RedisOutputStream.java @@ -1,21 +1,15 @@ package redis.clients.util; import java.io.*; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.nio.charset.Charset; -import java.nio.charset.CharsetEncoder; -import java.nio.charset.CoderResult; /** * The class implements a buffered output stream without synchronization - * There are also special operations like in-place string encoding + * There are also special operations like in-place string encoding. + * This stream fully ignore mark/reset and should not be used outside Jedis */ public final class RedisOutputStream extends FilterOutputStream { protected final byte buf[]; - protected final ByteBuffer outByteBuffer; - - private final CharsetEncoder CHARSET_ENCODER = CHARSET.newEncoder(); protected int count; public static final Charset CHARSET = Charset.forName("UTF-8"); @@ -30,13 +24,11 @@ public final class RedisOutputStream extends FilterOutputStream { throw new IllegalArgumentException("Buffer size <= 0"); } buf = new byte[size]; - outByteBuffer = ByteBuffer.wrap(buf); } private void flushBuffer() throws IOException { if (count > 0) { out.write(buf, 0, count); - outByteBuffer.position(0); count = 0; } } @@ -233,4 +225,4 @@ public final class RedisOutputStream extends FilterOutputStream { flushBuffer(); out.flush(); } -} \ No newline at end of file +} diff --git a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java index ad161fe..a5747e0 100644 --- a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java +++ b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java @@ -15,6 +15,7 @@ import junit.framework.Assert; import org.junit.Test; import redis.clients.jedis.Protocol; +import redis.clients.util.RedisInputStream; import redis.clients.util.RedisOutputStream; public class ProtocolTest extends Assert { @@ -43,7 +44,7 @@ public class ProtocolTest extends Assert { public void bulkReply() { InputStream is = new ByteArrayInputStream("$6\r\nfoobar\r\n".getBytes()); Protocol protocol = new Protocol(); - String response = (String) protocol.read(new DataInputStream(is)); + String response = (String) protocol.read(new RedisInputStream(is)); assertEquals("foobar", response); } @@ -51,9 +52,9 @@ public class ProtocolTest extends Assert { public void fragmentedBulkReply() { FragmentedByteArrayInputStream fis = new FragmentedByteArrayInputStream("$30\r\n012345678901234567890123456789\r\n".getBytes()); Protocol protocol = new Protocol(); - String response = (String) protocol.read(new DataInputStream(fis)); + String response = (String) protocol.read(new RedisInputStream(fis)); assertEquals("012345678901234567890123456789", response); - assertEquals(3, fis.getReadMethodCallCount()); +// assertEquals(3, fis.getReadMethodCallCount()); } @@ -61,7 +62,7 @@ public class ProtocolTest extends Assert { public void nullBulkReply() { InputStream is = new ByteArrayInputStream("$-1\r\n".getBytes()); Protocol protocol = new Protocol(); - String response = (String) protocol.read(new DataInputStream(is)); + String response = (String) protocol.read(new RedisInputStream(is)); assertEquals(null, response); } @@ -69,7 +70,7 @@ public class ProtocolTest extends Assert { public void singleLineReply() { InputStream is = new ByteArrayInputStream("+OK\r\n".getBytes()); Protocol protocol = new Protocol(); - String response = (String) protocol.read(new DataInputStream(is)); + String response = (String) protocol.read(new RedisInputStream(is)); assertEquals("OK", response); } @@ -77,7 +78,7 @@ public class ProtocolTest extends Assert { public void integerReply() { InputStream is = new ByteArrayInputStream(":123\r\n".getBytes()); Protocol protocol = new Protocol(); - int response = (Integer) protocol.read(new DataInputStream(is)); + int response = (Integer) protocol.read(new RedisInputStream(is)); assertEquals(123, response); } @@ -89,7 +90,7 @@ public class ProtocolTest extends Assert { .getBytes()); Protocol protocol = new Protocol(); List response = (List) (List) protocol - .read(new DataInputStream(is)); + .read(new RedisInputStream(is)); List expected = new ArrayList(); expected.add("foo"); expected.add("bar"); @@ -103,7 +104,7 @@ public class ProtocolTest extends Assert { .getBytes()); protocol = new Protocol(); List response2 = (List) protocol - .read(new DataInputStream(is)); + .read(new RedisInputStream(is)); List expected2 = new ArrayList(); expected2.add("foo"); expected2.add("OK"); @@ -122,7 +123,7 @@ public class ProtocolTest extends Assert { InputStream is = new ByteArrayInputStream("*-1\r\n".getBytes()); Protocol protocol = new Protocol(); List response = (List) protocol - .read(new DataInputStream(is)); + .read(new RedisInputStream(is)); assertNull(response); } } \ No newline at end of file