diff --git a/Makefile b/Makefile index 761d033..bb6597d 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ pidfile /tmp/redis1.pid logfile /tmp/redis1.log save "" appendonly no +client-output-buffer-limit pubsub 256k 128k 5 endef define REDIS2_CONF diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index db3d915..6133159 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -101,7 +101,10 @@ public final class Protocol { int offset = 0; try { while (offset < len) { - offset += is.read(read, offset, (len - offset)); + int size = is.read(read, offset, (len - offset)); + if (size == -1) + throw new JedisConnectionException("It seems like server has closed the connection."); + offset += size; } // read 2 more bytes for the command delimiter is.readByte(); diff --git a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java index c7a94f4..9da4cbb 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java @@ -528,4 +528,75 @@ public class PublishSubscribeCommandsTest extends JedisCommandTestBase { }; pubsub.unsubscribe(); } + + //@Test(expected = JedisConnectionException.class) + @Test + public void handleClientOutputBufferLimitForSubscribeTooSlow() throws InterruptedException { + final Thread t = new Thread(new Runnable() { + public void run() { + try { + Jedis j = createJedis(); + + Thread.sleep(1000); + + // we already set jedis1 config to client-output-buffer-limit pubsub 256k 128k 5 + // it means if subscriber delayed to receive over 256k or 128k continuously 5 sec, + // redis disconnects subscriber + + // we publish over 10M data for making situation for exceed client-output-buffer-limit + String veryLargeString = makeLargeString(1024 * 10); + + // 10K * 1024 = 10M + for (int i = 0 ; i < 1024 ; i++) + j.publish("foo", veryLargeString); // 1k + + j.disconnect(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + }); + t.start(); + jedis.subscribe(new JedisPubSub() { + public void onMessage(String channel, String message) { + try { + // wait 0.5 secs to slow down subscribe and client-output-buffer exceed + System.out.println("channel - " + channel + " / message - " + message); + Thread.sleep(500); + } catch (Exception e) { + try { + t.join(); + } catch (InterruptedException e1) { + } + + fail(e.getMessage()); + } + } + + public void onSubscribe(String channel, int subscribedChannels) { + } + + public void onUnsubscribe(String channel, int subscribedChannels) { + } + + public void onPSubscribe(String pattern, int subscribedChannels) { + } + + public void onPUnsubscribe(String pattern, int subscribedChannels) { + } + + public void onPMessage(String pattern, String channel, + String message) { + } + }, "foo"); + t.join(); + } + + private String makeLargeString(int size) { + StringBuffer sb = new StringBuffer(); + for (int i = 0 ; i < size ; i++) + sb.append((char)('a' + i % 26)); + + return sb.toString(); + } }