diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index a341a69..5f0c50f 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -1,9 +1,8 @@ package redis.clients.jedis; -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import redis.clients.util.RedisOutputStream; + +import java.io.*; import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; @@ -15,7 +14,7 @@ public class Connection { private int port = Protocol.DEFAULT_PORT; private Socket socket; private Protocol protocol = new Protocol(); - private DataOutputStream outputStream; + private RedisOutputStream outputStream; private DataInputStream inputStream; private int pipelinedCommands = 0; private int timeout = 2000; @@ -91,9 +90,8 @@ public class Connection { if (!isConnected()) { socket = new Socket(host, port); socket.setSoTimeout(timeout); - outputStream = new DataOutputStream(socket.getOutputStream()); - inputStream = new DataInputStream(new BufferedInputStream(socket - .getInputStream())); + outputStream = new RedisOutputStream(socket.getOutputStream()); + inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream())); } } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 8621872..8cf6171 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -1,15 +1,23 @@ package redis.clients.jedis; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import redis.clients.util.RedisOutputStream; + +import java.io.*; import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; import java.util.ArrayList; import java.util.List; public class Protocol { public static final Charset CHARSET = Charset.forName("UTF-8"); + private static final ThreadLocal CHARSET_ENCODER = new ThreadLocal (){ + @Override + protected CharsetEncoder initialValue() { + return CHARSET.newEncoder(); + } + }; + public static final String DOLLAR = "$"; public static final String ASTERISK = "*"; public static final String PLUS = "+"; @@ -26,29 +34,36 @@ public class Protocol { public static final byte MINUS_BYTE = MINUS.getBytes(CHARSET)[0]; public static final byte COLON_BYTE = COLON.getBytes(CHARSET)[0]; - public void sendCommand(DataOutputStream os, String name, String... args) { - StringBuilder sb = new StringBuilder(); - sb.append(ASTERISK); - sb.append((new Integer(args.length + 1)).toString()); - sb.append(COMMAND_DELIMITER); - sb.append(DOLLAR); - sb.append((new Integer(name.length())).toString()); - sb.append(COMMAND_DELIMITER); - sb.append(name); - sb.append(COMMAND_DELIMITER); + public void sendCommand(RedisOutputStream os, String name, String... args) { + try { + final CharsetEncoder encoder = CHARSET_ENCODER.get(); - for (String arg : args) { - int size = arg.getBytes(CHARSET).length; + os.write(ASTERISK_BYTE); + os.write(String.valueOf(args.length + 1), encoder); + os.write(COMMAND_DELIMITER_BYTES); + os.write(DOLLAR_BYTE); + os.write(String.valueOf(name.length()), encoder); + os.write(COMMAND_DELIMITER_BYTES); + os.write(name, encoder); + os.write(COMMAND_DELIMITER_BYTES); - sb.append(DOLLAR); - sb.append((new Integer(size)).toString()); - sb.append(COMMAND_DELIMITER); - sb.append(arg); - sb.append(COMMAND_DELIMITER); - } + for (String arg : args) { + final byte[] bytes = arg.getBytes(CHARSET); + int size = bytes.length; - try { - os.write(sb.toString().getBytes(CHARSET)); + os.write(DOLLAR_BYTE); + os.write(String.valueOf(size), encoder); + os.write(COMMAND_DELIMITER_BYTES); + os.write(bytes); + os.write(COMMAND_DELIMITER_BYTES); + } + os.flush (); + } catch (IOException e) { + throw new JedisException(e); + } + +// try { +// os.write(os.toByteArray()); /* os.write(ASTERISK_BYTE); os.write((new Integer(args.length + 1)).toString() @@ -71,9 +86,9 @@ public class Protocol { os.write(COMMAND_DELIMITER_BYTES); } */ - } catch (IOException e) { - throw new JedisException(e); - } +// } catch (IOException e) { +// throw new JedisException(e); +// } } public void processError(DataInputStream is) { diff --git a/src/main/java/redis/clients/util/RedisOutputStream.java b/src/main/java/redis/clients/util/RedisOutputStream.java new file mode 100644 index 0000000..18108fe --- /dev/null +++ b/src/main/java/redis/clients/util/RedisOutputStream.java @@ -0,0 +1,98 @@ +package redis.clients.util; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; + +/** + * The class implements a buffered output stream without synchronization + * There are also special operations like in-place string encoding + */ +public final class RedisOutputStream extends FilterOutputStream { + protected final byte buf[]; + protected final ByteBuffer outByteBuffer; + + protected int count; + + public RedisOutputStream(OutputStream out) { + this(out, 8192); + } + + public RedisOutputStream(OutputStream out, int size) { + super(out); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + buf = new byte[size]; + outByteBuffer = ByteBuffer.wrap(buf); + } + + private void flushBuffer() throws IOException { + if (count > 0) { + out.write(buf, 0, count); + outByteBuffer.position(0); + count = 0; + } + } + + public void write(int b) throws IOException { + buf[count++] = (byte) b; + if (count >= buf.length) { + flushBuffer(); + } + } + + public void write(byte b[], int off, int len) throws IOException { + if (len >= buf.length) { + flushBuffer(); + out.write(b, off, len); + } + else { + if (len >= buf.length - count) { + flushBuffer(); + } + + System.arraycopy(b, off, buf, count, len); + count += len; + } + } + + public void write(String str, CharsetEncoder encoder) throws IOException { + final CharBuffer in = CharBuffer.wrap(str); + if (in.remaining() == 0) + return; + + outByteBuffer.position(count); + + encoder.reset(); + for (;;) { + CoderResult cr; + if (in.hasRemaining()) + cr = encoder.encode(in, outByteBuffer, true); + else + cr = encoder.flush(outByteBuffer); + + count = outByteBuffer.position(); + if(count == buf.length) + flushBuffer(); + + if (cr.isUnderflow()) { + break; + } + if (cr.isOverflow()) { + flushBuffer(); + continue; + } + cr.throwException(); + } + } + + public void flush() throws IOException { + flushBuffer(); + out.flush(); + } +} diff --git a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java index 361d50e..ad161fe 100644 --- a/src/test/java/redis/clients/jedis/tests/ProtocolTest.java +++ b/src/test/java/redis/clients/jedis/tests/ProtocolTest.java @@ -3,7 +3,6 @@ package redis.clients.jedis.tests; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PipedInputStream; @@ -16,6 +15,7 @@ import junit.framework.Assert; import org.junit.Test; import redis.clients.jedis.Protocol; +import redis.clients.util.RedisOutputStream; public class ProtocolTest extends Assert { @Test @@ -25,7 +25,7 @@ public class ProtocolTest extends Assert { PipedOutputStream pos = new PipedOutputStream(pis); Protocol protocol = new Protocol(); - protocol.sendCommand(new DataOutputStream(pos), "GET", "SOMEKEY"); + protocol.sendCommand(new RedisOutputStream(pos), "GET", "SOMEKEY"); pos.close(); String expectedCommand = "*2\r\n$3\r\nGET\r\n$7\r\nSOMEKEY\r\n";