Added DataStreams and BufferedStream to increase performance

This commit is contained in:
Jonathan Leibiusky
2010-08-04 22:11:00 -03:00
parent c4e5d0b89b
commit 3af260f904
4 changed files with 37 additions and 32 deletions

View File

@@ -1,8 +1,9 @@
package redis.clients.jedis; package redis.clients.jedis;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.List; import java.util.List;
@@ -13,8 +14,8 @@ public class Connection {
private Socket socket; private Socket socket;
private boolean connected = false; private boolean connected = false;
private Protocol protocol = new Protocol(); private Protocol protocol = new Protocol();
private OutputStream outputStream; private DataOutputStream outputStream;
private InputStream inputStream; private DataInputStream inputStream;
public Connection(String host) { public Connection(String host) {
super(); super();
@@ -58,8 +59,9 @@ public class Connection {
if (!connected) { if (!connected) {
socket = new Socket(host, port); socket = new Socket(host, port);
connected = socket.isConnected(); connected = socket.isConnected();
outputStream = socket.getOutputStream(); outputStream = new DataOutputStream(socket.getOutputStream());
inputStream = socket.getInputStream(); inputStream = new DataInputStream(new BufferedInputStream(socket
.getInputStream()));
} }
} }

View File

@@ -1,8 +1,8 @@
package redis.clients.jedis; package redis.clients.jedis;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -21,7 +21,7 @@ public class Protocol {
public static final byte MINUS_BYTE = MINUS.getBytes()[0]; public static final byte MINUS_BYTE = MINUS.getBytes()[0];
public static final byte COLON_BYTE = COLON.getBytes()[0]; public static final byte COLON_BYTE = COLON.getBytes()[0];
public void sendCommand(OutputStream os, String name, String... args) { public void sendCommand(DataOutputStream os, String name, String... args) {
StringBuilder builder = new StringBuilder(ASTERISK + (args.length + 1) StringBuilder builder = new StringBuilder(ASTERISK + (args.length + 1)
+ COMMAND_DELIMITER + DOLLAR + name.length() + COMMAND_DELIMITER + DOLLAR + name.length()
+ COMMAND_DELIMITER + name + COMMAND_DELIMITER); + COMMAND_DELIMITER + name + COMMAND_DELIMITER);
@@ -36,20 +36,20 @@ public class Protocol {
} }
} }
public void processError(InputStream is) { public void processError(DataInputStream is) {
String message = readLine(is); String message = readLine(is);
throw new JedisException(message); throw new JedisException(message);
} }
private String readLine(InputStream is) { private String readLine(DataInputStream is) {
byte b; byte b;
byte c; byte c;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
try { try {
while ((b = (byte) is.read()) != -1) { while ((b = is.readByte()) != -1) {
if (b == '\r') { if (b == '\r') {
c = (byte) is.read(); c = is.readByte();
if (c == '\n') { if (c == '\n') {
break; break;
} }
@@ -65,24 +65,24 @@ public class Protocol {
return sb.toString(); return sb.toString();
} }
public String getBulkReply(InputStream is) { public String getBulkReply(DataInputStream is) {
Object reply = process(is); Object reply = process(is);
return (String) reply; return (String) reply;
} }
public String getStatusCodeReply(InputStream is) { public String getStatusCodeReply(DataInputStream is) {
Object reply = process(is); Object reply = process(is);
return (String) reply; return (String) reply;
} }
public int getIntegerReply(InputStream is) { public int getIntegerReply(DataInputStream is) {
Object reply = process(is); Object reply = process(is);
return (Integer) reply; return (Integer) reply;
} }
private Object process(InputStream is) { private Object process(DataInputStream is) {
try { try {
byte b = (byte) is.read(); byte b = is.readByte();
if (b == MINUS_BYTE) { if (b == MINUS_BYTE) {
processError(is); processError(is);
} else if (b == ASTERISK_BYTE) { } else if (b == ASTERISK_BYTE) {
@@ -102,13 +102,13 @@ public class Protocol {
return null; return null;
} }
private Object processStatusCodeReply(InputStream is) { private Object processStatusCodeReply(DataInputStream is) {
String ret = null; String ret = null;
ret = readLine(is); ret = readLine(is);
return ret; return ret;
} }
private Object processBulkReply(InputStream is) { private Object processBulkReply(DataInputStream is) {
int len = Integer.parseInt(readLine(is)); int len = Integer.parseInt(readLine(is));
if (len == -1) { if (len == -1) {
return null; return null;
@@ -126,14 +126,14 @@ public class Protocol {
return new String(read); return new String(read);
} }
private Object processInteger(InputStream is) { private Object processInteger(DataInputStream is) {
int ret = 0; int ret = 0;
String num = readLine(is); String num = readLine(is);
ret = Integer.parseInt(num); ret = Integer.parseInt(num);
return ret; return ret;
} }
private Object processMultiBulkReply(InputStream is) { private Object processMultiBulkReply(DataInputStream is) {
int num = Integer.parseInt(readLine(is)); int num = Integer.parseInt(readLine(is));
if (num == -1) { if (num == -1) {
return null; return null;
@@ -146,7 +146,7 @@ public class Protocol {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<Object> getMultiBulkReply(InputStream is) { public List<Object> getMultiBulkReply(DataInputStream is) {
Object reply = process(is); Object reply = process(is);
List<Object> ret = (List<Object>) reply; List<Object> ret = (List<Object>) reply;
return ret; return ret;

View File

@@ -2,6 +2,8 @@ package redis.clients.jedis.tests;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PipedInputStream; import java.io.PipedInputStream;
@@ -23,7 +25,7 @@ public class ProtocolTest extends Assert {
PipedOutputStream pos = new PipedOutputStream(pis); PipedOutputStream pos = new PipedOutputStream(pis);
Protocol protocol = new Protocol(); Protocol protocol = new Protocol();
protocol.sendCommand(pos, "GET", "SOMEKEY"); protocol.sendCommand(new DataOutputStream(pos), "GET", "SOMEKEY");
pos.close(); pos.close();
String expectedCommand = "*2\r\n$3\r\nGET\r\n$7\r\nSOMEKEY\r\n"; String expectedCommand = "*2\r\n$3\r\nGET\r\n$7\r\nSOMEKEY\r\n";
@@ -41,7 +43,7 @@ public class ProtocolTest extends Assert {
public void bulkReply() { public void bulkReply() {
InputStream is = new ByteArrayInputStream("$6\r\nfoobar\r\n".getBytes()); InputStream is = new ByteArrayInputStream("$6\r\nfoobar\r\n".getBytes());
Protocol protocol = new Protocol(); Protocol protocol = new Protocol();
String response = protocol.getBulkReply(is); String response = protocol.getBulkReply(new DataInputStream(is));
assertEquals("foobar", response); assertEquals("foobar", response);
} }
@@ -49,7 +51,7 @@ public class ProtocolTest extends Assert {
public void nullBulkReply() { public void nullBulkReply() {
InputStream is = new ByteArrayInputStream("$-1\r\n".getBytes()); InputStream is = new ByteArrayInputStream("$-1\r\n".getBytes());
Protocol protocol = new Protocol(); Protocol protocol = new Protocol();
String response = protocol.getBulkReply(is); String response = protocol.getBulkReply(new DataInputStream(is));
assertEquals(null, response); assertEquals(null, response);
} }
@@ -57,7 +59,7 @@ public class ProtocolTest extends Assert {
public void singleLineReply() { public void singleLineReply() {
InputStream is = new ByteArrayInputStream("+OK\r\n".getBytes()); InputStream is = new ByteArrayInputStream("+OK\r\n".getBytes());
Protocol protocol = new Protocol(); Protocol protocol = new Protocol();
String response = protocol.getStatusCodeReply(is); String response = protocol.getStatusCodeReply(new DataInputStream(is));
assertEquals("OK", response); assertEquals("OK", response);
} }
@@ -65,7 +67,7 @@ public class ProtocolTest extends Assert {
public void integerReply() { public void integerReply() {
InputStream is = new ByteArrayInputStream(":123\r\n".getBytes()); InputStream is = new ByteArrayInputStream(":123\r\n".getBytes());
Protocol protocol = new Protocol(); Protocol protocol = new Protocol();
int response = protocol.getIntegerReply(is); int response = protocol.getIntegerReply(new DataInputStream(is));
assertEquals(123, response); assertEquals(123, response);
} }
@@ -77,7 +79,7 @@ public class ProtocolTest extends Assert {
.getBytes()); .getBytes());
Protocol protocol = new Protocol(); Protocol protocol = new Protocol();
List<String> response = (List<String>) (List<?>) protocol List<String> response = (List<String>) (List<?>) protocol
.getMultiBulkReply(is); .getMultiBulkReply(new DataInputStream(is));
List<String> expected = new ArrayList<String>(); List<String> expected = new ArrayList<String>();
expected.add("foo"); expected.add("foo");
expected.add("bar"); expected.add("bar");
@@ -90,7 +92,8 @@ public class ProtocolTest extends Assert {
"*4\r\n$3\r\nfoo\r\n+OK\r\n:1000\r\n*2\r\n$3\r\nfoo\r\n$3\r\nbar" "*4\r\n$3\r\nfoo\r\n+OK\r\n:1000\r\n*2\r\n$3\r\nfoo\r\n$3\r\nbar"
.getBytes()); .getBytes());
protocol = new Protocol(); protocol = new Protocol();
List<Object> response2 = protocol.getMultiBulkReply(is); List<Object> response2 = protocol
.getMultiBulkReply(new DataInputStream(is));
List<Object> expected2 = new ArrayList<Object>(); List<Object> expected2 = new ArrayList<Object>();
expected2.add("foo"); expected2.add("foo");
expected2.add("OK"); expected2.add("OK");
@@ -109,7 +112,7 @@ public class ProtocolTest extends Assert {
InputStream is = new ByteArrayInputStream("*-1\r\n".getBytes()); InputStream is = new ByteArrayInputStream("*-1\r\n".getBytes());
Protocol protocol = new Protocol(); Protocol protocol = new Protocol();
List<String> response = (List<String>) (List<?>) protocol List<String> response = (List<String>) (List<?>) protocol
.getMultiBulkReply(is); .getMultiBulkReply(new DataInputStream(is));
assertNull(response); assertNull(response);
} }
} }