-
-
Save jpe42/d03482c4b3515f78d1dbf9d7b092711e to your computer and use it in GitHub Desktop.
private static int runWithJedipus(final IntStream keys) { | |
int count = 0; | |
try (final RedisClient client = | |
RedisClientFactory.startBuilding().create(Node.create("127.0.0.1", 6379))) { | |
client.sendCmd(Cmds.FLUSHALL); | |
final long start = System.currentTimeMillis(); | |
client.replyOff(); | |
final byte[] defaultValue = RESP.toBytes("D"); | |
count = | |
(int) keys.peek(key -> client.sendCmd(Cmds.SETNX, RESP.toBytes(key), defaultValue)).count(); | |
client.replyOn(); | |
System.out.format("Loading %d key/values took %dms.%n", count, | |
System.currentTimeMillis() - start); | |
final long dbsize = client.sendCmd(Cmds.DBSIZE.prim()); | |
if (dbsize != count) { | |
throw new IllegalStateException( | |
String.format("Expected %d keys, but %d exist.", count, dbsize)); | |
} | |
} | |
return count; | |
} |
private static int runWithJedis(final IntStream keys) { | |
final int[] cnt = {0}; | |
try (final Jedis jedis = new Jedis("127.0.0.1", 6379)) { | |
jedis.flushAll(); | |
final long start = System.currentTimeMillis(); | |
final byte[] defaultValue = RESP.toBytes("D"); | |
final Pipeline pipeline = jedis.pipelined(); | |
keys.forEach(key -> { | |
if (++cnt[0] % 1_000 == 0) { | |
pipeline.sync(); | |
} | |
pipeline.setnx(RESP.toBytes(key), defaultValue); | |
}); | |
pipeline.sync(); | |
System.out.format("Loading %d key/values took %dms.%n", cnt[0], | |
System.currentTimeMillis() - start); | |
final long dbsize = jedis.dbSize(); | |
if (dbsize != cnt[0]) { | |
throw new IllegalStateException( | |
String.format("Expected %d keys, but %d exist.", cnt[0], dbsize)); | |
} | |
} | |
return cnt[0]; | |
} |
So the problems are related with a larger outputBufferSize.
private int runWithJedipus(BufferedReader br){
try (final RedisClient client =
RedisClientFactory.startBuilding().withOutputBufferSize(16384).create(Node.create(redisServerHost, redisServerPort))) {
AtomicInteger counter = new AtomicInteger(0);
try (final RedisPipeline pipeline = client.pipeline()) {
if (isFlushEnabled && !isFlushed) {
pipeline.sendCmd(Cmds.FLUSHALL);
isFlushed = true;
}
//client.replyOff();
//pipeline.replyOff();
br.lines().forEach(key -> {
pipeline.sendCmd(Cmds.SETNX, RESP.toBytes(key), RESP.toBytes(DEFAULT_VALUE));
counter.incrementAndGet();
//if (counter.incrementAndGet() % 350_000 == 0) {
//pipeline.sync();
//}
});
}
return counter.get();
}
}
Gives me steady results of 28s (slower than latest with replyOff + sync/not sync
2016-05-25 01:22:31.157 INFO 7068 --- [ restartedMain] com.bemobi.redis.RedisLoaderApplication : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:23:00.038 INFO 7068 --- [ restartedMain] com.bemobi.redis.RedisLoaderApplication : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:23:28.660 INFO 7068 --- [ restartedMain] com.bemobi.redis.RedisLoaderApplication : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:23:57.169 INFO 7068 --- [ restartedMain] com.bemobi.redis.RedisLoaderApplication : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 28s)
2016-05-25 01:24:26.186 INFO 7068 --- [ restartedMain] com.bemobi.redis.RedisLoaderApplication : File [C:\temp\redis\xaa] successfully processed: lines[10000000] commits[10000000] (took 29s)
Give this a go:
private int runWithJedipus(final BufferedReader br) {
try (final RedisClient client =
RedisClientFactory.startBuilding().create(Node.create(redisServerHost, redisServerPort))) {
if (isFlushEnabled && !isFlushed) {
client.sendCmd(Cmds.FLUSHALL);
isFlushed = true;
}
final byte[] defaultValue = RESP.toBytes(DEFAULT_VALUE);
client.replyOff();
return (int) br.lines()
.map(RESP::toBytes)
.peek(key -> client.sendCmd(Cmds.SETNX, key, defaultValue)).count();
}
}
Please make sure you are on at least version '2.6.17', I only mention it, because I often use dynamic versions, e.g. '2.6.+', and they won't update if their cache duration has not expired.
My intention with changing the output buffer size was for further experimentation. I'm not sure what the optimal buffer size is for this application.
Np, since I'm using maven it's ok to get that updated quickly. I'll try ( and later one create an test on jedipus with jmh so we can test that within the lib)
@dgomesbr, I've tuned the output buffer size in the latest release. For the examples in this gist I'm seeing a 15-20% improvement over Jedis. The improvement will be even greater when accessing a remote Redis server.
@dgomesbr, Sorry, I just noticed your update tonight, I need to figure how to enable notifications for gists. And terribly sorry for leading you astray with the reply off, hopefully I can win you back :)
I tested out and updated the gist with 3 methods that compare Jedipus to Jedis directly and tries to mimic your setup as much as possible.
The issue with REPLY OFF in non-pipelined mode was that the default behavior was flushing the buffer after each call :/ I've changed that default behavior to only flush in REPLY ON mode and in addition exposed a flush() method in case the user wants to flush while in REPLY OFF mode. The latest release has these changes.
With this change, the
JedipusBulkReplyOffLoading.java
test runs 10-12% faster than theJedisBulkPipelinedLoading.java
for me. The increase in performance is primarily because you don't have tosync()
every 1000 calls.Also, the db flushing could affect the time recording of your tests... When I flush after inserting more that 4 million unique keys I get connection timeouts for both Jedipus and Jedis.
The performance could probably be further optimized by tweaking the OutputBuffer size for the RedisClient which you can do with the RedisClientFactory like so: