Merge branch 'master' into pipeline-and-transaction-can-handle-responses-on-their-own-status
* it's broken with later features, resetState() * fixed resetState() to make it work with this PR Conflicts: src/main/java/redis/clients/jedis/BinaryJedis.java src/main/java/redis/clients/jedis/Connection.java src/main/java/redis/clients/jedis/Pipeline.java src/main/java/redis/clients/jedis/Transaction.java src/main/java/redis/clients/jedis/TransactionBlock.java
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package redis.clients.jedis;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
@@ -10,12 +11,11 @@ import java.util.List;
|
||||
import redis.clients.jedis.Protocol.Command;
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.jedis.exceptions.JedisDataException;
|
||||
import redis.clients.jedis.exceptions.JedisException;
|
||||
import redis.clients.util.RedisInputStream;
|
||||
import redis.clients.util.RedisOutputStream;
|
||||
import redis.clients.util.SafeEncoder;
|
||||
|
||||
public class Connection {
|
||||
public class Connection implements Closeable {
|
||||
private String host;
|
||||
private int port = Protocol.DEFAULT_PORT;
|
||||
private Socket socket;
|
||||
@@ -23,184 +23,207 @@ public class Connection {
|
||||
private RedisInputStream inputStream;
|
||||
private int timeout = Protocol.DEFAULT_TIMEOUT;
|
||||
|
||||
private boolean broken = false;
|
||||
|
||||
public Socket getSocket() {
|
||||
return socket;
|
||||
return socket;
|
||||
}
|
||||
|
||||
public int getTimeout() {
|
||||
return timeout;
|
||||
return timeout;
|
||||
}
|
||||
|
||||
public void setTimeout(final int timeout) {
|
||||
this.timeout = timeout;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public void setTimeoutInfinite() {
|
||||
try {
|
||||
if(!isConnected()) {
|
||||
connect();
|
||||
}
|
||||
socket.setKeepAlive(true);
|
||||
socket.setSoTimeout(0);
|
||||
} catch (SocketException ex) {
|
||||
throw new JedisException(ex);
|
||||
}
|
||||
try {
|
||||
if (!isConnected()) {
|
||||
connect();
|
||||
}
|
||||
socket.setKeepAlive(true);
|
||||
socket.setSoTimeout(0);
|
||||
} catch (SocketException ex) {
|
||||
broken = true;
|
||||
throw new JedisConnectionException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public void rollbackTimeout() {
|
||||
try {
|
||||
socket.setSoTimeout(timeout);
|
||||
socket.setKeepAlive(false);
|
||||
} catch (SocketException ex) {
|
||||
throw new JedisException(ex);
|
||||
}
|
||||
try {
|
||||
socket.setSoTimeout(timeout);
|
||||
socket.setKeepAlive(false);
|
||||
} catch (SocketException ex) {
|
||||
broken = true;
|
||||
throw new JedisConnectionException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public Connection(final String host) {
|
||||
super();
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
protected void flush() {
|
||||
try {
|
||||
outputStream.flush();
|
||||
} catch (IOException e) {
|
||||
throw new JedisConnectionException(e);
|
||||
}
|
||||
super();
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
protected Connection sendCommand(final Command cmd, final String... args) {
|
||||
final byte[][] bargs = new byte[args.length][];
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
bargs[i] = SafeEncoder.encode(args[i]);
|
||||
}
|
||||
return sendCommand(cmd, bargs);
|
||||
final byte[][] bargs = new byte[args.length][];
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
bargs[i] = SafeEncoder.encode(args[i]);
|
||||
}
|
||||
return sendCommand(cmd, bargs);
|
||||
}
|
||||
|
||||
protected Connection sendCommand(final Command cmd, final byte[]... args) {
|
||||
connect();
|
||||
Protocol.sendCommand(outputStream, cmd, args);
|
||||
return this;
|
||||
try {
|
||||
connect();
|
||||
Protocol.sendCommand(outputStream, cmd, args);
|
||||
return this;
|
||||
} catch (JedisConnectionException ex) {
|
||||
// Any other exceptions related to connection?
|
||||
broken = true;
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected Connection sendCommand(final Command cmd) {
|
||||
connect();
|
||||
Protocol.sendCommand(outputStream, cmd, new byte[0][]);
|
||||
return this;
|
||||
try {
|
||||
connect();
|
||||
Protocol.sendCommand(outputStream, cmd, new byte[0][]);
|
||||
return this;
|
||||
} catch (JedisConnectionException ex) {
|
||||
// Any other exceptions related to connection?
|
||||
broken = true;
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
public Connection(final String host, final int port) {
|
||||
super();
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
super();
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
return host;
|
||||
}
|
||||
|
||||
public void setHost(final String host) {
|
||||
this.host = host;
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
return port;
|
||||
}
|
||||
|
||||
public void setPort(final int port) {
|
||||
this.port = port;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public Connection() {
|
||||
|
||||
|
||||
}
|
||||
|
||||
public void connect() {
|
||||
if (!isConnected()) {
|
||||
try {
|
||||
socket = new Socket();
|
||||
//->@wjw_add
|
||||
socket.setReuseAddress(true);
|
||||
socket.setKeepAlive(true); //Will monitor the TCP connection is valid
|
||||
socket.setTcpNoDelay(true); //Socket buffer Whetherclosed, to ensure timely delivery of data
|
||||
socket.setSoLinger(true,0); //Control calls close () method, the underlying socket is closed immediately
|
||||
//<-@wjw_add
|
||||
if (!isConnected()) {
|
||||
try {
|
||||
socket = new Socket();
|
||||
// ->@wjw_add
|
||||
socket.setReuseAddress(true);
|
||||
socket.setKeepAlive(true); // Will monitor the TCP connection is
|
||||
// valid
|
||||
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
|
||||
// ensure timely delivery of data
|
||||
socket.setSoLinger(true, 0); // Control calls close () method,
|
||||
// the underlying socket is closed
|
||||
// immediately
|
||||
// <-@wjw_add
|
||||
|
||||
socket.connect(new InetSocketAddress(host, port), timeout);
|
||||
socket.setSoTimeout(timeout);
|
||||
outputStream = new RedisOutputStream(socket.getOutputStream());
|
||||
inputStream = new RedisInputStream(socket.getInputStream());
|
||||
} catch (IOException ex) {
|
||||
throw new JedisConnectionException(ex);
|
||||
}
|
||||
}
|
||||
socket.connect(new InetSocketAddress(host, port), timeout);
|
||||
socket.setSoTimeout(timeout);
|
||||
outputStream = new RedisOutputStream(socket.getOutputStream());
|
||||
inputStream = new RedisInputStream(socket.getInputStream());
|
||||
} catch (IOException ex) {
|
||||
broken = true;
|
||||
throw new JedisConnectionException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
disconnect();
|
||||
}
|
||||
|
||||
public void disconnect() {
|
||||
if (isConnected()) {
|
||||
try {
|
||||
inputStream.close();
|
||||
outputStream.close();
|
||||
if (!socket.isClosed()) {
|
||||
socket.close();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw new JedisConnectionException(ex);
|
||||
}
|
||||
}
|
||||
if (isConnected()) {
|
||||
try {
|
||||
inputStream.close();
|
||||
outputStream.close();
|
||||
if (!socket.isClosed()) {
|
||||
socket.close();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
broken = true;
|
||||
throw new JedisConnectionException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isConnected() {
|
||||
return socket != null && socket.isBound() && !socket.isClosed()
|
||||
&& socket.isConnected() && !socket.isInputShutdown()
|
||||
&& !socket.isOutputShutdown();
|
||||
return socket != null && socket.isBound() && !socket.isClosed()
|
||||
&& socket.isConnected() && !socket.isInputShutdown()
|
||||
&& !socket.isOutputShutdown();
|
||||
}
|
||||
|
||||
protected String getStatusCodeReply() {
|
||||
flush();
|
||||
final byte[] resp = (byte[]) Protocol.read(inputStream);
|
||||
if (null == resp) {
|
||||
return null;
|
||||
} else {
|
||||
return SafeEncoder.encode(resp);
|
||||
}
|
||||
flush();
|
||||
final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
|
||||
if (null == resp) {
|
||||
return null;
|
||||
} else {
|
||||
return SafeEncoder.encode(resp);
|
||||
}
|
||||
}
|
||||
|
||||
public String getBulkReply() {
|
||||
final byte[] result = getBinaryBulkReply();
|
||||
if (null != result) {
|
||||
return SafeEncoder.encode(result);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
final byte[] result = getBinaryBulkReply();
|
||||
if (null != result) {
|
||||
return SafeEncoder.encode(result);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public byte[] getBinaryBulkReply() {
|
||||
flush();
|
||||
return (byte[]) Protocol.read(inputStream);
|
||||
flush();
|
||||
return (byte[]) readProtocolWithCheckingBroken();
|
||||
}
|
||||
|
||||
public Long getIntegerReply() {
|
||||
flush();
|
||||
return (Long) Protocol.read(inputStream);
|
||||
flush();
|
||||
return (Long) readProtocolWithCheckingBroken();
|
||||
}
|
||||
|
||||
public List<String> getMultiBulkReply() {
|
||||
return BuilderFactory.STRING_LIST.build(getBinaryMultiBulkReply());
|
||||
return BuilderFactory.STRING_LIST.build(getBinaryMultiBulkReply());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<byte[]> getBinaryMultiBulkReply() {
|
||||
flush();
|
||||
return (List<byte[]>) Protocol.read(inputStream);
|
||||
flush();
|
||||
return (List<byte[]>) readProtocolWithCheckingBroken();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<Object> getObjectMultiBulkReply() {
|
||||
flush();
|
||||
return (List<Object>) Protocol.read(inputStream);
|
||||
public List<Object> getRawObjectMultiBulkReply() {
|
||||
return (List<Object>) readProtocolWithCheckingBroken();
|
||||
}
|
||||
|
||||
|
||||
public List<Object> getObjectMultiBulkReply() {
|
||||
flush();
|
||||
return getRawObjectMultiBulkReply();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<Long> getIntegerMultiBulkReply() {
|
||||
flush();
|
||||
@@ -209,15 +232,37 @@ public class Connection {
|
||||
|
||||
public Object getOne() {
|
||||
flush();
|
||||
return Protocol.read(inputStream);
|
||||
return readProtocolWithCheckingBroken();
|
||||
}
|
||||
|
||||
|
||||
public boolean isBroken() {
|
||||
return broken;
|
||||
}
|
||||
|
||||
protected void flush() {
|
||||
try {
|
||||
outputStream.flush();
|
||||
} catch (IOException ex) {
|
||||
broken = true;
|
||||
throw new JedisConnectionException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
protected Object readProtocolWithCheckingBroken() {
|
||||
try {
|
||||
return Protocol.read(inputStream);
|
||||
} catch (JedisConnectionException exc) {
|
||||
broken = true;
|
||||
throw exc;
|
||||
}
|
||||
}
|
||||
|
||||
public List<Object> getMany(int count) {
|
||||
flush();
|
||||
List<Object> responses = new ArrayList<Object>();
|
||||
for (int i = 0 ; i < count ; i++) {
|
||||
try {
|
||||
responses.add(Protocol.read(inputStream));
|
||||
responses.add(readProtocolWithCheckingBroken());
|
||||
} catch (JedisDataException e) {
|
||||
responses.add(e);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user