Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save pveentjer/8316994aa8e89a75262ab357aae1cbcf to your computer and use it in GitHub Desktop.
Save pveentjer/8316994aa8e89a75262ab357aae1cbcf to your computer and use it in GitHub Desktop.
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hazelcast.file;
import com.hazelcast.internal.tpcengine.Promise;
import com.hazelcast.internal.tpcengine.Reactor;
import com.hazelcast.internal.tpcengine.ReactorBuilder;
import com.hazelcast.internal.tpcengine.ReactorType;
import com.hazelcast.internal.tpcengine.file.AsyncFile;
import com.hazelcast.internal.tpcengine.file.AsyncFileMetrics;
import com.hazelcast.internal.tpcengine.file.StorageDeviceRegistry;
import com.hazelcast.internal.tpcengine.util.BufferUtil;
import com.hazelcast.internal.util.ThreadAffinity;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import static com.hazelcast.internal.tpcengine.ReactorBuilder.newReactorBuilder;
import static com.hazelcast.internal.tpcengine.file.AsyncFile.O_CREAT;
import static com.hazelcast.internal.tpcengine.file.AsyncFile.O_DIRECT;
import static com.hazelcast.internal.tpcengine.file.AsyncFile.O_NOATIME;
import static com.hazelcast.internal.tpcengine.file.AsyncFile.O_RDWR;
import static com.hazelcast.internal.tpcengine.file.AsyncFile.PERMISSIONS_ALL;
import static com.hazelcast.internal.tpcengine.util.BufferUtil.allocateDirect;
import static com.hazelcast.internal.tpcengine.util.OS.pageSize;
/**
* For a FIO based benchmark so you can compare
* fio --name=write_throughput --numjobs=1 --size=4M --time_based --runtime=60s --ramp_time=2s --ioengine=io_uring --direct=1 --verify=0 --bs=4k --iodepth=64 --rw=write --group_reporting=1 --cpus_allowed=1
* <p>
* To see IOPS and bandwidth:
* iostat -dx 1 /dev/nvme1n1p1 (and then use the appropriate drive)
*/
public class StorageBenchmark {
public static final int WORKLOAD_NOP = 1;
public static final int WORKLOAD_WRITE = 2;
public static final int WORKLOAD_READ = 3;
public static final int WORKLOAD_RANDOM_WRITE = 4;
public static final int WORKLOAD_RANDOM_READ = 5;
// Properties
public long operationCount;
// the number of threads.
public int numJobs;
public String affinity;
public boolean spin;
// the number of concurrent tasks in 1 thread.
public int iodepth;
public long fileSize;
public int bs;
public String drive;
public int workload;
public boolean enableProgressMonitor;
public boolean deleteFilesOnExit;
public boolean direct;
public ReactorType reactorType;
public int fsync;
public int fdatasync;
private final List<String> fileList = new ArrayList<>();
private final List<AsyncFile> asyncFiles = Collections.synchronizedList(new ArrayList<>());
private final StorageDeviceRegistry storageDeviceRegistry = new StorageDeviceRegistry();
private final ArrayList<Reactor> reactors = new ArrayList();
public static void main(String[] args) {
StorageBenchmark benchmark = new StorageBenchmark();
benchmark.operationCount = 10 * 1000 * 1000l;
benchmark.affinity = "1,3";
benchmark.numJobs = 1;
benchmark.iodepth = 128;
benchmark.fileSize = 128 * 1024 * 4096;
benchmark.bs = 4 * 1024;
benchmark.drive = "/mnt/benchdrive1/";
benchmark.workload = WORKLOAD_WRITE;
benchmark.enableProgressMonitor = true;
benchmark.deleteFilesOnExit = true;
benchmark.direct = true;
benchmark.spin = false;
benchmark.reactorType = ReactorType.IOURING;
benchmark.fsync = 0;
benchmark.fdatasync = 1;
benchmark.run();
}
public void run() {
if (fsync > 0 && fdatasync > 0) {
System.out.println("fsync and fdatasync can't both be larger than 0");
}
try {
setup();
CountDownLatch completionLatch = new CountDownLatch(iodepth * numJobs);
CountDownLatch startLatch = new CountDownLatch(1);
for (int jobIndex = 0; jobIndex < numJobs; jobIndex++) {
Reactor reactor = reactors.get(jobIndex);
int finalReactorIndex = jobIndex;
reactor.offer(() -> {
try {
startLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String path = fileList.get(finalReactorIndex % fileList.size());
AsyncFile file = reactor.eventloop().newAsyncFile(path);
asyncFiles.add(file);
int openFlags = O_RDWR | O_NOATIME;
if (direct) {
openFlags |= O_DIRECT;
}
file.open(openFlags, PERMISSIONS_ALL).then((integer, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
System.exit(1);
}
for (int k = 0; k < iodepth; k++) {
IOTask ioTask = new IOTask(k);
ioTask.file = file;
ioTask.completionLatch = completionLatch;
long operationCountPerReactor = operationCount / numJobs;
ioTask.operationCount = operationCountPerReactor / iodepth;
reactor.offer(ioTask);
}
});
});
}
MonitorThread progressThread = null;
if (enableProgressMonitor) {
progressThread = new MonitorThread();
progressThread.start();
}
System.out.println("Benchmark: started");
startLatch.countDown();
long startMs = System.currentTimeMillis();
completionLatch.await();
if (enableProgressMonitor) {
progressThread.shutdown();
}
long durationMs = System.currentTimeMillis() - startMs;
System.out.println("Benchmark: completed");
printResults(durationMs);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
teardown();
System.exit(0);
}
}
private void setup() throws Exception {
long startMs = System.currentTimeMillis();
System.out.println("Setup: starting");
ThreadAffinity threadAffinity = affinity == null ? null : new ThreadAffinity(affinity);
// storageDeviceRegistry.register(dir, 512, 512);
for (int k = 0; k < numJobs; k++) {
ReactorBuilder builder = newReactorBuilder(reactorType);
//builder.setFlags(IORING_SETUP_IOPOLL);
builder.setThreadAffinity(threadAffinity);
builder.setSpin(spin);
builder.setStorageDeviceRegistry(storageDeviceRegistry);
Reactor reactor = builder.build();
reactors.add(reactor);
reactor.start();
}
setupFiles();
long durationMs = System.currentTimeMillis() - startMs;
System.out.println("Setup: done [duration=" + durationMs + " ms]");
}
private void setupFiles() throws InterruptedException {
int iodepth = 128;
CountDownLatch completionLatch = new CountDownLatch(iodepth * numJobs);
for (int jobIndex = 0; jobIndex < numJobs; jobIndex++) {
Reactor reactor = reactors.get(jobIndex);
reactor.offer(() -> {
String path = randomTmpFile(drive);
System.out.println("Creating file " + path);
fileList.add(path);
AsyncFile file = reactor.eventloop().newAsyncFile(path);
int openFlags = O_RDWR | O_NOATIME | O_DIRECT | O_CREAT;
long startMs = System.currentTimeMillis();
file.open(openFlags, PERMISSIONS_ALL).then((integer, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
System.exit(1);
}
AtomicInteger completed = new AtomicInteger(iodepth);
for (int k = 0; k < iodepth; k++) {
InitFileTask initFileTask = new InitFileTask(k);
initFileTask.startMs = startMs;
initFileTask.completionLatch = completionLatch;
initFileTask.completed = completed;
initFileTask.file = file;
reactor.offer(initFileTask);
}
});
});
}
completionLatch.await();
}
private void teardown() {
long startMs = System.currentTimeMillis();
System.out.println("Teardown: starting");
try {
for (Reactor reactor : reactors) {
reactor.shutdown();
}
for (Reactor reactor : reactors) {
if (!reactor.awaitTermination(5, TimeUnit.SECONDS)) {
throw new RuntimeException("Reactor " + reactor + " failed to terminate");
}
}
} catch (Exception e) {
e.printStackTrace();
}
if (deleteFilesOnExit) {
for (String path : fileList) {
System.out.println("Deleting " + path);
if (!new File(path).delete()) {
System.out.println("Failed to delete " + path);
}
}
}
long durationMs = System.currentTimeMillis() - startMs;
System.out.println("Teardown: done [duration=" + durationMs + " ms]");
}
private void printResults(long durationMs) {
DecimalFormat longFormat = new DecimalFormat("#,###");
DecimalFormat doubleFormat = new DecimalFormat("#,###.00");
System.out.println("Duration: " + longFormat.format(durationMs) + " ms");
System.out.println("Reactors: " + numJobs);
System.out.println("I/O depth: " + iodepth);
System.out.println("Direct I/O: " + direct);
System.out.println("Page size: " + pageSize() + " B");
System.out.println("Operations: " + longFormat.format(operationCount));
System.out.println("fsync: " + fsync);
System.out.println("fdatasync: " + fdatasync);
System.out.println("Speed: " + doubleFormat.format(operationCount * 1000f / durationMs) + " IOPS");
switch (workload) {
case WORKLOAD_NOP:
System.out.println("Workload: nop");
break;
case WORKLOAD_WRITE:
System.out.println("Workload: sequential write");
break;
case WORKLOAD_READ:
System.out.println("Workload: sequential read");
break;
case WORKLOAD_RANDOM_WRITE:
System.out.println("Workload: random write");
break;
case WORKLOAD_RANDOM_READ:
System.out.println("Workload: random read");
break;
default:
System.out.println("Workload: unknown");
}
System.out.println("File size: " + longFormat.format(fileSize) + " B");
System.out.println("Block size: " + longFormat.format(bs) + " B");
long dataSize = bs * operationCount;
if (workload != WORKLOAD_NOP) {
System.out.println("Read/Written: " + doubleFormat.format((dataSize * 1.0f) / (1024 * 1024 * 1024)) + " GiB");
System.out.println("Bandwidth: " + doubleFormat.format(dataSize * 1000f / (durationMs * 1024 * 1024)) + " MiB/s");
}
}
private class MonitorThread extends Thread {
private boolean shutdown;
private void shutdown() {
shutdown = true;
interrupt();
}
@Override
public void run() {
long lastTimeMs = System.currentTimeMillis();
StringBuffer sb = new StringBuffer();
DecimalFormat doubleFormat = new DecimalFormat("#,###.00");
DecimalFormat longFormat = new DecimalFormat("#,###");
LastAsyncFileMetrics[] metricsArray = new LastAsyncFileMetrics[asyncFiles.size()];
for (int k = 0; k < metricsArray.length; k++) {
metricsArray[k] = new LastAsyncFileMetrics();
}
while (!shutdown) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
long nowMs = System.currentTimeMillis();
for (int k = 0; k < metricsArray.length; k++) {
AsyncFile file = asyncFiles.get(k);
AsyncFileMetrics metrics = file.getMetrics();
LastAsyncFileMetrics lastMetrics = metricsArray[k];
long fsyncs = metrics.fsyncs();
if (fsyncs > 0) {
double perf = ((fsyncs - lastMetrics.fsyncs) * 1000f) / (nowMs - lastTimeMs);
lastMetrics.fsyncs = fsyncs;
sb.append(" fsyncs/s: ").append(longFormat.format(perf));
}
long fdatasyncs = metrics.fdatasyncs();
if (fdatasyncs > 0) {
double perf = ((fdatasyncs - lastMetrics.fdatasyncs) * 1000f) / (nowMs - lastTimeMs);
lastMetrics.fdatasyncs = fdatasyncs;
sb.append(" fdatasyncs/s: ").append(longFormat.format(perf));
}
long writes = metrics.writes();
if (writes > 0) {
double perf = ((writes - lastMetrics.writes) * 1000f) / (nowMs - lastTimeMs);
lastMetrics.writes = writes;
sb.append(" writes/s: ").append(longFormat.format(perf));
}
long bytesWritten = metrics.bytesWritten();
if (bytesWritten > 0) {
double perf = ((bytesWritten - lastMetrics.bytesWritten) * 1000f) / (nowMs - lastTimeMs);
lastMetrics.bytesWritten = bytesWritten;
sb.append(" write-bytes/s: ").append(doubleFormat.format(perf));
}
long reads = metrics.reads();
if (reads > 0) {
double perf = ((reads - lastMetrics.reads) * 1000f) / (nowMs - lastTimeMs);
lastMetrics.reads = reads;
sb.append(" read/s: ").append(longFormat.format(perf));
}
long bytesRead = metrics.bytesRead();
if (bytesRead > 0) {
double perf = ((bytesRead - lastMetrics.bytesRead) * 1000f) / (nowMs - lastTimeMs);
lastMetrics.bytesRead = bytesRead;
sb.append(" read-bytes/s: ").append(doubleFormat.format(perf));
}
System.out.println(sb);
sb.setLength(0);
}
lastTimeMs = nowMs;
}
}
}
private static class LastAsyncFileMetrics {
private long reads;
private long writes;
private long fsyncs;
private long fdatasyncs;
private long bytesRead;
private long bytesWritten;
}
private class IOTask implements Runnable, BiConsumer<Integer, Throwable> {
private final long startBlock;
private final long endBlock;
private long operation;
private AsyncFile file;
private long operationCount;
private CountDownLatch completionLatch;
private long block;
private final long bufferAddress;
private final ByteBuffer buffer;
private final Random random = new Random();
private final int blockSize;
private final long blockCount;
private final int workload;
private int syncCounter;
private int syncInterval;
private boolean fdatasync;
public IOTask(int ioTaskIndex) {
this.workload = StorageBenchmark.this.workload;
if (StorageBenchmark.this.fsync > 0) {
this.syncInterval = StorageBenchmark.this.fsync;
this.fdatasync = false;
} else if (StorageBenchmark.this.fdatasync > 0) {
this.syncInterval = StorageBenchmark.this.fdatasync;
this.fdatasync = true;
} else {
this.syncInterval = 0;
}
// Setting up the buffer
this.buffer = allocateDirect(bs, pageSize());
this.bufferAddress = BufferUtil.addressOf(buffer);
for (int c = 0; c < buffer.capacity() / 2; c++) {
buffer.putChar('c');
}
this.blockSize = StorageBenchmark.this.bs;
long fileBlockCount = StorageBenchmark.this.fileSize / StorageBenchmark.this.bs;
this.blockCount = fileBlockCount / StorageBenchmark.this.iodepth;
this.startBlock = (fileBlockCount * ioTaskIndex) / StorageBenchmark.this.iodepth;
this.block = startBlock;
this.endBlock = startBlock + blockCount;
}
@Override
public void run() {
Promise<Integer> p;
switch (workload) {
case WORKLOAD_NOP: {
p = file.nop();
}
break;
case WORKLOAD_WRITE: {
if (syncInterval > 0) {
if (syncCounter == 0) {
syncCounter = syncInterval;
p = fdatasync ? file.fdatasync() : file.fsync();
break;
} else {
syncCounter--;
}
}
long offset = block * blockSize;
block++;
if (block > endBlock) {
block = startBlock;
}
p = file.pwrite(offset, blockSize, bufferAddress);
}
break;
case WORKLOAD_READ: {
long offset = block * blockSize;
block++;
if (block > endBlock) {
block = startBlock;
}
p = file.pread(offset, blockSize, bufferAddress);
}
break;
case WORKLOAD_RANDOM_WRITE: {
if (syncInterval > 0) {
if (syncCounter == 0) {
syncCounter = syncInterval;
p = fdatasync ? file.fdatasync() : file.fsync();
break;
} else {
syncCounter--;
}
}
long nextBlock = startBlock + nextLong(random, blockCount);
long offset = nextBlock * blockSize;
p = file.pwrite(offset, blockSize, bufferAddress);
}
break;
case WORKLOAD_RANDOM_READ: {
long nextBlock = startBlock + nextLong(random, blockCount);
long offset = nextBlock * blockSize;
p = file.pread(offset, blockSize, bufferAddress);
}
break;
default:
throw new RuntimeException("Unknown workload");
}
p.then(this).releaseOnComplete();
}
@Override
public void accept(Integer result, Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
System.exit(1);
}
if (operation >= operationCount) {
completionLatch.countDown();
return;
}
operation++;
run();
}
}
private class InitFileTask implements Runnable, BiConsumer<Integer, Throwable> {
private final long startBlock;
private final long endBlock;
public AtomicInteger completed;
public long startMs;
private AsyncFile file;
private CountDownLatch completionLatch;
private long block;
private final long bufferAddress;
private final ByteBuffer buffer;
private final int blockSize;
private final long blockCount;
public InitFileTask(int ioTaskIndex) {
// Setting up the buffer
this.buffer = allocateDirect(bs, pageSize());
this.bufferAddress = BufferUtil.addressOf(buffer);
for (int c = 0; c < buffer.capacity() / 2; c++) {
buffer.putChar('c');
}
this.blockSize = StorageBenchmark.this.bs;
long fileBlockCount = StorageBenchmark.this.fileSize / StorageBenchmark.this.bs;
this.blockCount = fileBlockCount / StorageBenchmark.this.iodepth;
this.startBlock = (fileBlockCount * ioTaskIndex) / StorageBenchmark.this.iodepth;
this.block = startBlock;
this.endBlock = startBlock + blockCount;
}
@Override
public void run() {
long offset = block * blockSize;
block++;
Promise<Integer> p = file.pwrite(offset, blockSize, bufferAddress);
p.then(this).releaseOnComplete();
}
@Override
public void accept(Integer result, Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
System.exit(1);
}
if (block == endBlock) {
if (completed.decrementAndGet() == 0) {
file.close().then((integer, throwable1) -> {
if (throwable1 != null) {
throwable1.printStackTrace();
System.exit(1);
}
long durationMs = System.currentTimeMillis() - startMs;
System.out.println("Creating file " + file.path() + " completed in " + durationMs + " ms");
completionLatch.countDown();
});
} else {
completionLatch.countDown();
}
return;
}
run();
}
}
private static long nextLong(Random random, long bound) {
if (bound <= 0) {
throw new IllegalArgumentException("bound must be positive");
}
long l = random.nextLong();
if (l == Long.MIN_VALUE) {
return 0;
} else if (l < 0) {
l = -l;
}
return l % bound;
}
private static String randomTmpFile(String dir) {
String uuid = UUID.randomUUID().toString().replace("-", "");
String separator = FileSystems.getDefault().getSeparator();
return dir + separator + uuid;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment