Skip to content

Instantly share code, notes, and snippets.

@jpe42
Last active June 2, 2016 18:35
Show Gist options
  • Save jpe42/d03482c4b3515f78d1dbf9d7b092711e to your computer and use it in GitHub Desktop.
Save jpe42/d03482c4b3515f78d1dbf9d7b092711e to your computer and use it in GitHub Desktop.
Bulk REPLY OFF Loading w/ Jedipus
private static int runWithJedipus(final IntStream keys) {
int count = 0;
try (final RedisClient client =
RedisClientFactory.startBuilding().create(Node.create("127.0.0.1", 6379))) {
client.sendCmd(Cmds.FLUSHALL);
final long start = System.currentTimeMillis();
client.replyOff();
final byte[] defaultValue = RESP.toBytes("D");
count =
(int) keys.peek(key -> client.sendCmd(Cmds.SETNX, RESP.toBytes(key), defaultValue)).count();
client.replyOn();
System.out.format("Loading %d key/values took %dms.%n", count,
System.currentTimeMillis() - start);
final long dbsize = client.sendCmd(Cmds.DBSIZE.prim());
if (dbsize != count) {
throw new IllegalStateException(
String.format("Expected %d keys, but %d exist.", count, dbsize));
}
}
return count;
}
private static int runWithJedis(final IntStream keys) {
final int[] cnt = {0};
try (final Jedis jedis = new Jedis("127.0.0.1", 6379)) {
jedis.flushAll();
final long start = System.currentTimeMillis();
final byte[] defaultValue = RESP.toBytes("D");
final Pipeline pipeline = jedis.pipelined();
keys.forEach(key -> {
if (++cnt[0] % 1_000 == 0) {
pipeline.sync();
}
pipeline.setnx(RESP.toBytes(key), defaultValue);
});
pipeline.sync();
System.out.format("Loading %d key/values took %dms.%n", cnt[0],
System.currentTimeMillis() - start);
final long dbsize = jedis.dbSize();
if (dbsize != cnt[0]) {
throw new IllegalStateException(
String.format("Expected %d keys, but %d exist.", cnt[0], dbsize));
}
}
return cnt[0];
}
@jpe42
Copy link
Author

jpe42 commented May 25, 2016

@dgomesbr, Sorry, I just noticed your update tonight, I need to figure how to enable notifications for gists. And terribly sorry for leading you astray with the reply off, hopefully I can win you back :)

I tested out and updated the gist with 3 methods that compare Jedipus to Jedis directly and tries to mimic your setup as much as possible.

The issue with REPLY OFF in non-pipelined mode was that the default behavior was flushing the buffer after each call :/ I've changed that default behavior to only flush in REPLY ON mode and in addition exposed a flush() method in case the user wants to flush while in REPLY OFF mode. The latest release has these changes.

With this change, the JedipusBulkReplyOffLoading.java test runs 10-12% faster than the JedisBulkPipelinedLoading.java for me. The increase in performance is primarily because you don't have to sync() every 1000 calls.

Also, the db flushing could affect the time recording of your tests... When I flush after inserting more that 4 million unique keys I get connection timeouts for both Jedipus and Jedis.

The performance could probably be further optimized by tweaking the OutputBuffer size for the RedisClient which you can do with the RedisClientFactory like so:

// default is 8192
final RedisClient client = RedisClientFactory.startBuilding().withOutputBufferSize(16384)
        .create(Node.create("localhost", 6379));

@dgomesbr
Copy link

So the problems are related with a larger outputBufferSize.

 private int runWithJedipus(BufferedReader br){
        try (final RedisClient client =
                     RedisClientFactory.startBuilding().withOutputBufferSize(16384).create(Node.create(redisServerHost, redisServerPort))) {

            AtomicInteger counter = new AtomicInteger(0);
            try (final RedisPipeline pipeline = client.pipeline()) {
                if (isFlushEnabled && !isFlushed) {
                    pipeline.sendCmd(Cmds.FLUSHALL);
                    isFlushed = true;
                }

                //client.replyOff();
                //pipeline.replyOff();

                br.lines().forEach(key -> {
                    pipeline.sendCmd(Cmds.SETNX, RESP.toBytes(key), RESP.toBytes(DEFAULT_VALUE));
                    counter.incrementAndGet();
                    //if (counter.incrementAndGet() % 350_000 == 0) {
                        //pipeline.sync();
                    //}
                });
            }

            return counter.get();
        }
    }

Gives me steady results of 28s (slower than latest with replyOff + sync/not sync

2016-05-25 01:22:31.157  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:23:00.038  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:23:28.660  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:23:57.169  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:24:26.186  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 29s)

@jpe42
Copy link
Author

jpe42 commented May 25, 2016

Give this a go:

private int runWithJedipus(final BufferedReader br) {

  try (final RedisClient client =
      RedisClientFactory.startBuilding().create(Node.create(redisServerHost, redisServerPort))) {

    if (isFlushEnabled && !isFlushed) {
      client.sendCmd(Cmds.FLUSHALL);
      isFlushed = true;
    }

    final byte[] defaultValue = RESP.toBytes(DEFAULT_VALUE);    

    client.replyOff();

    return (int) br.lines()
        .map(RESP::toBytes)
        .peek(key -> client.sendCmd(Cmds.SETNX, key, defaultValue)).count();
  }
}

Please make sure you are on at least version '2.6.17', I only mention it, because I often use dynamic versions, e.g. '2.6.+', and they won't update if their cache duration has not expired.

My intention with changing the output buffer size was for further experimentation. I'm not sure what the optimal buffer size is for this application.

@dgomesbr
Copy link

Np, since I'm using maven it's ok to get that updated quickly. I'll try ( and later one create an test on jedipus with jmh so we can test that within the lib)

@jpe42
Copy link
Author

jpe42 commented May 27, 2016

@dgomesbr, I've tuned the output buffer size in the latest release. For the examples in this gist I'm seeing a 15-20% improvement over Jedis. The improvement will be even greater when accessing a remote Redis server.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment