Addresses issues #779 and #775.

Conflicts:
	src/main/java/redis/clients/jedis/Protocol.java
This commit is contained in:
rdifalco
2014-10-21 18:45:50 -07:00
committed by Jungtaek Lim
parent 5d0f75a32f
commit 59a13121ab
4 changed files with 299 additions and 98 deletions

View File

@@ -1,5 +1,12 @@
package redis.clients.jedis; package redis.clients.jedis;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.util.RedisInputStream;
import redis.clients.util.RedisOutputStream;
import redis.clients.util.SafeEncoder;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@@ -8,13 +15,6 @@ import java.net.SocketException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.util.RedisInputStream;
import redis.clients.util.RedisOutputStream;
import redis.clients.util.SafeEncoder;
public class Connection implements Closeable { public class Connection implements Closeable {
private String host = Protocol.DEFAULT_HOST; private String host = Protocol.DEFAULT_HOST;
@@ -170,7 +170,7 @@ public class Connection implements Closeable {
&& !socket.isOutputShutdown(); && !socket.isOutputShutdown();
} }
protected String getStatusCodeReply() { public String getStatusCodeReply() {
flush(); flush();
final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
if (null == resp) { if (null == resp) {
@@ -252,9 +252,9 @@ public class Connection implements Closeable {
} }
} }
public List<Object> getMany(int count) { public List<Object> getMany(final int count) {
flush(); flush();
List<Object> responses = new ArrayList<Object>(); final List<Object> responses = new ArrayList<Object>(count);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
try { try {
responses.add(readProtocolWithCheckingBroken()); responses.add(readProtocolWithCheckingBroken());

View File

@@ -128,67 +128,61 @@ public final class Protocol {
} }
private static Object process(final RedisInputStream is) { private static Object process(final RedisInputStream is) {
try {
byte b = is.readByte(); final byte b = is.readByte();
if (b == MINUS_BYTE) { if (b == PLUS_BYTE) {
processError(is); return processStatusCodeReply(is);
} else if (b == ASTERISK_BYTE) { } else if (b == DOLLAR_BYTE) {
return processMultiBulkReply(is); return processBulkReply(is);
} else if (b == COLON_BYTE) { } else if (b == ASTERISK_BYTE) {
return processInteger(is); return processMultiBulkReply(is);
} else if (b == DOLLAR_BYTE) { } else if (b == COLON_BYTE) {
return processBulkReply(is); return processInteger(is);
} else if (b == PLUS_BYTE) { } else if (b == MINUS_BYTE) {
return processStatusCodeReply(is); processError(is);
} else { return null;
throw new JedisConnectionException("Unknown reply: " + (char) b); } else {
} throw new JedisConnectionException("Unknown reply: " + (char) b);
} catch (IOException e) {
throw new JedisConnectionException(e);
} }
return null;
} }
private static byte[] processStatusCodeReply(final RedisInputStream is) { private static byte[] processStatusCodeReply(final RedisInputStream is) {
return SafeEncoder.encode(is.readLine()); return is.readLineBytes();
} }
private static byte[] processBulkReply(final RedisInputStream is) { private static byte[] processBulkReply(final RedisInputStream is) {
int len = Integer.parseInt(is.readLine()); final int len = is.readIntCrLf();
if (len == -1) { if (len == -1) {
return null; return null;
} }
byte[] read = new byte[len];
final byte[] read = new byte[len];
int offset = 0; int offset = 0;
try { while (offset < len) {
while (offset < len) { final int size = is.read(read, offset, (len - offset));
int size = is.read(read, offset, (len - offset)); if (size == -1)
if (size == -1) throw new JedisConnectionException(
throw new JedisConnectionException( "It seems like server has closed the connection.");
"It seems like server has closed the connection."); offset += size;
offset += size;
}
// read 2 more bytes for the command delimiter
is.readByte();
is.readByte();
} catch (IOException e) {
throw new JedisConnectionException(e);
} }
// read 2 more bytes for the command delimiter
is.readByte();
is.readByte();
return read; return read;
} }
private static Long processInteger(final RedisInputStream is) { private static Long processInteger(final RedisInputStream is) {
String num = is.readLine(); return is.readLongCrLf();
return Long.valueOf(num);
} }
private static List<Object> processMultiBulkReply(final RedisInputStream is) { private static List<Object> processMultiBulkReply(final RedisInputStream is) {
int num = Integer.parseInt(is.readLine()); final int num = is.readIntCrLf();
if (num == -1) { if (num == -1) {
return null; return null;
} }
List<Object> ret = new ArrayList<Object>(num); final List<Object> ret = new ArrayList<Object>(num);
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
try { try {
ret.add(process(is)); ret.add(process(is));

View File

@@ -16,12 +16,15 @@
package redis.clients.util; package redis.clients.util;
import java.io.FilterInputStream; import java.io.*;
import java.io.IOException;
import java.io.InputStream;
import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisConnectionException;
/**
* This class assumes (to some degree) that we are reading a RESP stream. As such it assumes
* certain conventions regarding CRLF line termination. It also assumes that if the Protocol
* layer requires a byte that if that byte is not there it is a stream error.
*/
public class RedisInputStream extends FilterInputStream { public class RedisInputStream extends FilterInputStream {
protected final byte buf[]; protected final byte buf[];
@@ -40,73 +43,172 @@ public class RedisInputStream extends FilterInputStream {
this(in, 8192); this(in, 8192);
} }
public byte readByte() throws IOException { public byte readByte() throws JedisConnectionException {
if (count == limit) { ensureFill();
fill();
}
return buf[count++]; return buf[count++];
} }
public String readLine() { public String readLine() {
int b; final StringBuilder sb = new StringBuilder();
byte c; while (true) {
StringBuilder sb = new StringBuilder(); ensureFill();
try { byte b = buf[count++];
while (true) { if (b == '\r') {
if (count == limit) { ensureFill(); // Must be one more byte
fill();
} byte c = buf[count++];
if (limit == -1) if (c == '\n') {
break; break;
b = buf[count++];
if (b == '\r') {
if (count == limit) {
fill();
}
if (limit == -1) {
sb.append((char) b);
break;
}
c = buf[count++];
if (c == '\n') {
break;
}
sb.append((char) b);
sb.append((char) c);
} else {
sb.append((char) b);
} }
sb.append((char) b);
sb.append((char) c);
} else {
sb.append((char) b);
} }
} catch (IOException e) {
throw new JedisConnectionException(e);
} }
String reply = sb.toString();
final String reply = sb.toString();
if (reply.length() == 0) { if (reply.length() == 0) {
throw new JedisConnectionException( throw new JedisConnectionException("It seems like server has closed the connection.");
"It seems like server has closed the connection.");
} }
return reply; return reply;
} }
public int read(byte[] b, int off, int len) throws IOException { public byte[] readLineBytes() {
if (count == limit) {
fill(); /* This operation should only require one fill. In that typical
if (limit == -1) case we optimize allocation and copy of the byte array. In the
return -1; edge case where more than one fill is required then we take a
slower path and expand a byte array output stream as is
necessary. */
ensureFill();
int pos = count;
final byte[] buf = this.buf;
while (true) {
if (pos == limit) {
return readLineBytesSlowly();
}
if (buf[pos++] == '\r') {
if (pos == limit) {
return readLineBytesSlowly();
}
if (buf[pos++] == '\n') {
break;
}
}
} }
final int N = (pos - count) - 2;
final byte[] line = new byte[N];
System.arraycopy(buf, count, line, 0, N);
count = pos;
return line;
}
/**
* Slow path in case a line of bytes cannot be read in one #fill() operation. This is still faster
* than creating the StrinbBuilder, String, then encoding as byte[] in Protocol, then decoding back
* into a String.
*/
private byte[] readLineBytesSlowly() {
ByteArrayOutputStream bout = null;
while (true) {
ensureFill();
byte b = buf[count++];
if (b == '\r') {
ensureFill(); // Must be one more byte
byte c = buf[count++];
if (c == '\n') {
break;
}
if (bout == null) {
bout = new ByteArrayOutputStream(16);
}
bout.write(b);
bout.write(c);
} else {
if (bout == null) {
bout = new ByteArrayOutputStream(16);
}
bout.write(b);
}
}
return bout == null ? new byte[0] : bout.toByteArray();
}
public int readIntCrLf() {
return (int)readLongCrLf();
}
public long readLongCrLf() {
final byte[] buf = this.buf;
ensureFill();
final boolean isNeg = buf[count] == '-';
if (isNeg) {
++count;
}
long value = 0;
while (true) {
ensureFill();
final int b = buf[count++];
if (b == '\r') {
ensureFill();
if (buf[count++] != '\n') {
throw new JedisConnectionException("Unexpected character!");
}
break;
}
else {
value = value * 10 + b - '0';
}
}
return (isNeg ? -value : value);
}
public int read(byte[] b, int off, int len) throws JedisConnectionException {
ensureFill();
final int length = Math.min(limit - count, len); final int length = Math.min(limit - count, len);
System.arraycopy(buf, count, b, off, length); System.arraycopy(buf, count, b, off, length);
count += length; count += length;
return length; return length;
} }
private void fill() throws IOException { /**
limit = in.read(buf); * This methods assumes there are required bytes to be read. If we cannot read
count = 0; * anymore bytes an exception is thrown to quickly ascertain that the stream
* was smaller than expected.
*/
private void ensureFill() throws JedisConnectionException {
if (count >= limit) {
try {
limit = in.read(buf);
count = 0;
if (limit == -1) {
throw new JedisConnectionException("Unexpected end of stream.");
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
} }
} }

View File

@@ -0,0 +1,105 @@
package redis.clients.jedis.tests.benchmark;
import redis.clients.jedis.Protocol;
import redis.clients.util.RedisInputStream;
import redis.clients.util.RedisOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
/**
* Copyright (c) 2014
*/
public class ProtocolBenchmark {
private static final int TOTAL_OPERATIONS = 500000;
public static void main(String[] args) throws Exception,
IOException {
long total = 0;
for (int at = 0; at != 10; ++at) {
long elapsed = measureInputMulti();
long ops = ((1000 * 2 * TOTAL_OPERATIONS) / TimeUnit.NANOSECONDS.toMillis(elapsed));
if (at >= 5) {
total += ops;
}
}
System.out.println((total / 5) + " avg");
total = 0;
for (int at = 0; at != 10; ++at) {
long elapsed = measureInputStatus();
long ops = ((1000 * 2 * TOTAL_OPERATIONS) / TimeUnit.NANOSECONDS.toMillis(elapsed));
if (at >= 5) {
total += ops;
}
}
System.out.println((total / 5) + " avg");
total = 0;
for (int at = 0; at != 10; ++at) {
long elapsed = measureCommand();
long ops = ((1000 * 2 * TOTAL_OPERATIONS) / TimeUnit.NANOSECONDS.toMillis(elapsed));
if (at >= 5) {
total += ops;
}
}
System.out.println((total / 5) + " avg");
}
private static long measureInputMulti() throws Exception {
long duration = 0;
InputStream is = new ByteArrayInputStream(
"*4\r\n$3\r\nfoo\r\n$13\r\nbarbarbarfooz\r\n$5\r\nHello\r\n$5\r\nWorld\r\n"
.getBytes());
RedisInputStream in = new RedisInputStream(is);
for (int n = 0; n <= TOTAL_OPERATIONS; n++) {
long start = System.nanoTime();
Protocol.read(in);
duration += (System.nanoTime() - start);
in.reset();
}
return duration;
}
private static long measureInputStatus() throws Exception {
long duration = 0;
InputStream is = new ByteArrayInputStream(
"+OK\r\n"
.getBytes());
RedisInputStream in = new RedisInputStream(is);
for (int n = 0; n <= TOTAL_OPERATIONS; n++) {
long start = System.nanoTime();
Protocol.read(in);
duration += (System.nanoTime() - start);
in.reset();
}
return duration;
}
private static long measureCommand() throws Exception {
long duration = 0;
byte[] KEY = "123456789".getBytes();
byte[] VAL = "FooBar".getBytes();
for (int n = 0; n <= TOTAL_OPERATIONS; n++) {
RedisOutputStream out = new RedisOutputStream(new ByteArrayOutputStream(8192));
long start = System.nanoTime();
Protocol.sendCommand(out, Protocol.Command.SET, KEY, VAL);
duration += (System.nanoTime() - start);
}
return duration;
}
}