Skip to content

Instantly share code, notes, and snippets.

@jpe42
Last active June 2, 2016 18:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jpe42/d03482c4b3515f78d1dbf9d7b092711e to your computer and use it in GitHub Desktop.
Save jpe42/d03482c4b3515f78d1dbf9d7b092711e to your computer and use it in GitHub Desktop.
Bulk REPLY OFF Loading w/ Jedipus
private static int runWithJedipus(final IntStream keys) {
int count = 0;
try (final RedisClient client =
RedisClientFactory.startBuilding().create(Node.create("127.0.0.1", 6379))) {
client.sendCmd(Cmds.FLUSHALL);
final long start = System.currentTimeMillis();
client.replyOff();
final byte[] defaultValue = RESP.toBytes("D");
count =
(int) keys.peek(key -> client.sendCmd(Cmds.SETNX, RESP.toBytes(key), defaultValue)).count();
client.replyOn();
System.out.format("Loading %d key/values took %dms.%n", count,
System.currentTimeMillis() - start);
final long dbsize = client.sendCmd(Cmds.DBSIZE.prim());
if (dbsize != count) {
throw new IllegalStateException(
String.format("Expected %d keys, but %d exist.", count, dbsize));
}
}
return count;
}
private static int runWithJedis(final IntStream keys) {
final int[] cnt = {0};
try (final Jedis jedis = new Jedis("127.0.0.1", 6379)) {
jedis.flushAll();
final long start = System.currentTimeMillis();
final byte[] defaultValue = RESP.toBytes("D");
final Pipeline pipeline = jedis.pipelined();
keys.forEach(key -> {
if (++cnt[0] % 1_000 == 0) {
pipeline.sync();
}
pipeline.setnx(RESP.toBytes(key), defaultValue);
});
pipeline.sync();
System.out.format("Loading %d key/values took %dms.%n", cnt[0],
System.currentTimeMillis() - start);
final long dbsize = jedis.dbSize();
if (dbsize != cnt[0]) {
throw new IllegalStateException(
String.format("Expected %d keys, but %d exist.", cnt[0], dbsize));
}
}
return cnt[0];
}
@dgomesbr
Copy link

dgomesbr commented May 19, 2016

Hey @jamespedwards42, about the pipelining sync (is it necessary?)

if (i % 10_000 == 0) {
        pipeline.sync();
        System.out.format("Synced %s writes.%n", member);
}

With Jedis, that's necessary because when we sync it it just bring out the all the ack's to the Heap. Do we need to do it as well with your?

@jpe42
Copy link
Author

jpe42 commented May 19, 2016

@dgomesbr , yeah, you are absolutely right, it is not necessary. I took it out of the gist. My initial thinking was that it would be a good idea to flush the SocketOutputStream every once in a while, but there is no need as that method does nothing.

@dgomesbr
Copy link

👍 Gonna try this out in a moment

@jpe42
Copy link
Author

jpe42 commented May 23, 2016

Updated the gist; realized there is no need for a pipeline if you don't care about any replies.

@dgomesbr
Copy link

Hey @jamespedwards42,

I must be doing something very wrong:

With Jedis I get constantly 19s.

java -Dloader.input.src=C:\\temp\\redis\\ -Dloader.redis.ip=127.0.0.1 -Dloader.redis.port=6379 -Xms512m -Xmx512m -Djava.net.preferIPv4Stack=true -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -Dloader.input.pattern=(^xaa.*) -Didea.launcher.port=7535 --spring.output.ansi.enabled=always

 _____          _ _       _                     _
|  __ \        | (_)     | |                   | |
| |__) |___  __| |_ ___  | |     ___   __ _  __| | ___ _ __
|  _  // _ \/ _` | / __| | |    / _ \ / _` |/ _` |/ _ \ '__|
| | \ \  __/ (_| | \__ \ | |___| (_) | (_| | (_| |  __/ |
|_|  \_\___|\__,_|_|___/ |______\___/ \__,_|\__,_|\___|_|
:: Version  :: Powered by Spring Boot 1.3.5.RELEASE  ::

2016-05-23 14:55:32.263  INFO 14520 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : The following profiles are active: dev
2016-05-23 14:55:33.723  INFO 14520 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : Redis Loader started
2016-05-23 14:55:38.481  INFO 14520 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : DB flushed!
2016-05-23 14:56:04.577  INFO 14520 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] contains [10000000] lines to process)
2016-05-23 14:56:04.579  INFO 14520 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 30s)
2016-05-23 15:19:06.955  INFO 9008 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 19s)
2016-05-23 15:19:26.124  INFO 9008 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 19s)
2016-05-23 15:19:45.482  INFO 9008 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 19s)
2016-05-23 15:20:04.699  INFO 9008 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 19s)
2016-05-23 15:20:24.407  INFO 9008 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 19s)
2016-05-23 15:20:43.857  INFO 9008 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 19s)
2016-05-23 15:21:03.548  INFO 9008 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 19s)
2016-05-23 15:21:24.200  INFO 9008 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 20s)
2016-05-23 15:21:43.509  INFO 9008 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 19s)
2016-05-23 14:56:04.589  INFO 14520 --- [       Thread-7] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown

Process finished with exit code 1

The first tryout with Jedipus gave me a wooping 116s

2016-05-23 15:25:14.421  INFO 9044 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[0] (took 116s)
2016-05-23 15:27:12.025  INFO 9044 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[0] (took 117s)

The code for that is

    private int runWithJedipus(BufferedReader br){
        try (final RedisClient client =
                     RedisClientFactory.startBuilding().create(Node.create(redisServerHost, redisServerPort))) {

            if (isFlushEnabled && !isFlushed) {
                client.sendCmd(Cmds.FLUSHALL);
                isFlushed = true;
            }

            AtomicInteger counter = new AtomicInteger(0);
            client.replyOff();

            br.lines().forEach(key -> {
                client.sendCmd(Cmds.SETNX, RESP.toBytes(key), RESP.toBytes(DEFAULT_VALUE));
            });

            return counter.get();
        }
    }

@dgomesbr
Copy link

dgomesbr commented May 23, 2016

By changing the client to pipeline with sync (as with jedis) I got times that resemble the original, without the sync I have the same results so:

 try (final RedisClient client =
                     RedisClientFactory.startBuilding().create(Node.create(redisServerHost, redisServerPort))) {

            AtomicInteger counter = new AtomicInteger(0);
            try (final RedisPipeline pipeline = client.pipeline()) {
                if (isFlushEnabled && !isFlushed) {
                    pipeline.sendCmd(Cmds.FLUSHALL);
                    isFlushed = true;
                }

                client.replyOff();
                pipeline.replyOff();

                br.lines().forEach(key -> {
                    pipeline.sendCmd(Cmds.SETNX, RESP.toBytes(key), RESP.toBytes(DEFAULT_VALUE));
                    if (counter.incrementAndGet() % 350_000 == 0) {
                        //pipeline.sync();
                    }
                });
            }

            return counter.get();
        }
2016-05-23 16:01:08.174  INFO 14608 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 23s)
2016-05-23 16:01:32.167  INFO 14608 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 23s)

@jpe42
Copy link
Author

jpe42 commented May 25, 2016

@dgomesbr, Sorry, I just noticed your update tonight, I need to figure how to enable notifications for gists. And terribly sorry for leading you astray with the reply off, hopefully I can win you back :)

I tested out and updated the gist with 3 methods that compare Jedipus to Jedis directly and tries to mimic your setup as much as possible.

The issue with REPLY OFF in non-pipelined mode was that the default behavior was flushing the buffer after each call :/ I've changed that default behavior to only flush in REPLY ON mode and in addition exposed a flush() method in case the user wants to flush while in REPLY OFF mode. The latest release has these changes.

With this change, the JedipusBulkReplyOffLoading.java test runs 10-12% faster than the JedisBulkPipelinedLoading.java for me. The increase in performance is primarily because you don't have to sync() every 1000 calls.

Also, the db flushing could affect the time recording of your tests... When I flush after inserting more that 4 million unique keys I get connection timeouts for both Jedipus and Jedis.

The performance could probably be further optimized by tweaking the OutputBuffer size for the RedisClient which you can do with the RedisClientFactory like so:

// default is 8192
final RedisClient client = RedisClientFactory.startBuilding().withOutputBufferSize(16384)
        .create(Node.create("localhost", 6379));

@dgomesbr
Copy link

So the problems are related with a larger outputBufferSize.

 private int runWithJedipus(BufferedReader br){
        try (final RedisClient client =
                     RedisClientFactory.startBuilding().withOutputBufferSize(16384).create(Node.create(redisServerHost, redisServerPort))) {

            AtomicInteger counter = new AtomicInteger(0);
            try (final RedisPipeline pipeline = client.pipeline()) {
                if (isFlushEnabled && !isFlushed) {
                    pipeline.sendCmd(Cmds.FLUSHALL);
                    isFlushed = true;
                }

                //client.replyOff();
                //pipeline.replyOff();

                br.lines().forEach(key -> {
                    pipeline.sendCmd(Cmds.SETNX, RESP.toBytes(key), RESP.toBytes(DEFAULT_VALUE));
                    counter.incrementAndGet();
                    //if (counter.incrementAndGet() % 350_000 == 0) {
                        //pipeline.sync();
                    //}
                });
            }

            return counter.get();
        }
    }

Gives me steady results of 28s (slower than latest with replyOff + sync/not sync

2016-05-25 01:22:31.157  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:23:00.038  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:23:28.660  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:23:57.169  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:24:26.186  INFO 7068 --- [  restartedMain] com.bemobi.redis.RedisLoaderApplication  : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 29s)

@jpe42
Copy link
Author

jpe42 commented May 25, 2016

Give this a go:

private int runWithJedipus(final BufferedReader br) {

  try (final RedisClient client =
      RedisClientFactory.startBuilding().create(Node.create(redisServerHost, redisServerPort))) {

    if (isFlushEnabled && !isFlushed) {
      client.sendCmd(Cmds.FLUSHALL);
      isFlushed = true;
    }

    final byte[] defaultValue = RESP.toBytes(DEFAULT_VALUE);    

    client.replyOff();

    return (int) br.lines()
        .map(RESP::toBytes)
        .peek(key -> client.sendCmd(Cmds.SETNX, key, defaultValue)).count();
  }
}

Please make sure you are on at least version '2.6.17', I only mention it, because I often use dynamic versions, e.g. '2.6.+', and they won't update if their cache duration has not expired.

My intention with changing the output buffer size was for further experimentation. I'm not sure what the optimal buffer size is for this application.

@dgomesbr
Copy link

Np, since I'm using maven it's ok to get that updated quickly. I'll try ( and later one create an test on jedipus with jmh so we can test that within the lib)

@jpe42
Copy link
Author

jpe42 commented May 27, 2016

@dgomesbr, I've tuned the output buffer size in the latest release. For the examples in this gist I'm seeing a 15-20% improvement over Jedis. The improvement will be even greater when accessing a remote Redis server.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment