Created
May 11, 2023 14:03
-
-
Save pveentjer/8316994aa8e89a75262ab357aae1cbcf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.hazelcast.file; | |
import com.hazelcast.internal.tpcengine.Promise; | |
import com.hazelcast.internal.tpcengine.Reactor; | |
import com.hazelcast.internal.tpcengine.ReactorBuilder; | |
import com.hazelcast.internal.tpcengine.ReactorType; | |
import com.hazelcast.internal.tpcengine.file.AsyncFile; | |
import com.hazelcast.internal.tpcengine.file.AsyncFileMetrics; | |
import com.hazelcast.internal.tpcengine.file.StorageDeviceRegistry; | |
import com.hazelcast.internal.tpcengine.util.BufferUtil; | |
import com.hazelcast.internal.util.ThreadAffinity; | |
import java.io.File; | |
import java.nio.ByteBuffer; | |
import java.nio.file.FileSystems; | |
import java.text.DecimalFormat; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.UUID; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.BiConsumer; | |
import static com.hazelcast.internal.tpcengine.ReactorBuilder.newReactorBuilder; | |
import static com.hazelcast.internal.tpcengine.file.AsyncFile.O_CREAT; | |
import static com.hazelcast.internal.tpcengine.file.AsyncFile.O_DIRECT; | |
import static com.hazelcast.internal.tpcengine.file.AsyncFile.O_NOATIME; | |
import static com.hazelcast.internal.tpcengine.file.AsyncFile.O_RDWR; | |
import static com.hazelcast.internal.tpcengine.file.AsyncFile.PERMISSIONS_ALL; | |
import static com.hazelcast.internal.tpcengine.util.BufferUtil.allocateDirect; | |
import static com.hazelcast.internal.tpcengine.util.OS.pageSize; | |
/** | |
* For a FIO based benchmark so you can compare | |
* fio --name=write_throughput --numjobs=1 --size=4M --time_based --runtime=60s --ramp_time=2s --ioengine=io_uring --direct=1 --verify=0 --bs=4k --iodepth=64 --rw=write --group_reporting=1 --cpus_allowed=1 | |
* <p> | |
* To see IOPS and bandwidth: | |
* iostat -dx 1 /dev/nvme1n1p1 (and then use the appropriate drive) | |
*/ | |
public class StorageBenchmark { | |
public static final int WORKLOAD_NOP = 1; | |
public static final int WORKLOAD_WRITE = 2; | |
public static final int WORKLOAD_READ = 3; | |
public static final int WORKLOAD_RANDOM_WRITE = 4; | |
public static final int WORKLOAD_RANDOM_READ = 5; | |
// Properties | |
public long operationCount; | |
// the number of threads. | |
public int numJobs; | |
public String affinity; | |
public boolean spin; | |
// the number of concurrent tasks in 1 thread. | |
public int iodepth; | |
public long fileSize; | |
public int bs; | |
public String drive; | |
public int workload; | |
public boolean enableProgressMonitor; | |
public boolean deleteFilesOnExit; | |
public boolean direct; | |
public ReactorType reactorType; | |
public int fsync; | |
public int fdatasync; | |
private final List<String> fileList = new ArrayList<>(); | |
private final List<AsyncFile> asyncFiles = Collections.synchronizedList(new ArrayList<>()); | |
private final StorageDeviceRegistry storageDeviceRegistry = new StorageDeviceRegistry(); | |
private final ArrayList<Reactor> reactors = new ArrayList(); | |
public static void main(String[] args) { | |
StorageBenchmark benchmark = new StorageBenchmark(); | |
benchmark.operationCount = 10 * 1000 * 1000l; | |
benchmark.affinity = "1,3"; | |
benchmark.numJobs = 1; | |
benchmark.iodepth = 128; | |
benchmark.fileSize = 128 * 1024 * 4096; | |
benchmark.bs = 4 * 1024; | |
benchmark.drive = "/mnt/benchdrive1/"; | |
benchmark.workload = WORKLOAD_WRITE; | |
benchmark.enableProgressMonitor = true; | |
benchmark.deleteFilesOnExit = true; | |
benchmark.direct = true; | |
benchmark.spin = false; | |
benchmark.reactorType = ReactorType.IOURING; | |
benchmark.fsync = 0; | |
benchmark.fdatasync = 1; | |
benchmark.run(); | |
} | |
public void run() { | |
if (fsync > 0 && fdatasync > 0) { | |
System.out.println("fsync and fdatasync can't both be larger than 0"); | |
} | |
try { | |
setup(); | |
CountDownLatch completionLatch = new CountDownLatch(iodepth * numJobs); | |
CountDownLatch startLatch = new CountDownLatch(1); | |
for (int jobIndex = 0; jobIndex < numJobs; jobIndex++) { | |
Reactor reactor = reactors.get(jobIndex); | |
int finalReactorIndex = jobIndex; | |
reactor.offer(() -> { | |
try { | |
startLatch.await(); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
String path = fileList.get(finalReactorIndex % fileList.size()); | |
AsyncFile file = reactor.eventloop().newAsyncFile(path); | |
asyncFiles.add(file); | |
int openFlags = O_RDWR | O_NOATIME; | |
if (direct) { | |
openFlags |= O_DIRECT; | |
} | |
file.open(openFlags, PERMISSIONS_ALL).then((integer, throwable) -> { | |
if (throwable != null) { | |
throwable.printStackTrace(); | |
System.exit(1); | |
} | |
for (int k = 0; k < iodepth; k++) { | |
IOTask ioTask = new IOTask(k); | |
ioTask.file = file; | |
ioTask.completionLatch = completionLatch; | |
long operationCountPerReactor = operationCount / numJobs; | |
ioTask.operationCount = operationCountPerReactor / iodepth; | |
reactor.offer(ioTask); | |
} | |
}); | |
}); | |
} | |
MonitorThread progressThread = null; | |
if (enableProgressMonitor) { | |
progressThread = new MonitorThread(); | |
progressThread.start(); | |
} | |
System.out.println("Benchmark: started"); | |
startLatch.countDown(); | |
long startMs = System.currentTimeMillis(); | |
completionLatch.await(); | |
if (enableProgressMonitor) { | |
progressThread.shutdown(); | |
} | |
long durationMs = System.currentTimeMillis() - startMs; | |
System.out.println("Benchmark: completed"); | |
printResults(durationMs); | |
} catch (RuntimeException e) { | |
throw e; | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} finally { | |
teardown(); | |
System.exit(0); | |
} | |
} | |
private void setup() throws Exception { | |
long startMs = System.currentTimeMillis(); | |
System.out.println("Setup: starting"); | |
ThreadAffinity threadAffinity = affinity == null ? null : new ThreadAffinity(affinity); | |
// storageDeviceRegistry.register(dir, 512, 512); | |
for (int k = 0; k < numJobs; k++) { | |
ReactorBuilder builder = newReactorBuilder(reactorType); | |
//builder.setFlags(IORING_SETUP_IOPOLL); | |
builder.setThreadAffinity(threadAffinity); | |
builder.setSpin(spin); | |
builder.setStorageDeviceRegistry(storageDeviceRegistry); | |
Reactor reactor = builder.build(); | |
reactors.add(reactor); | |
reactor.start(); | |
} | |
setupFiles(); | |
long durationMs = System.currentTimeMillis() - startMs; | |
System.out.println("Setup: done [duration=" + durationMs + " ms]"); | |
} | |
private void setupFiles() throws InterruptedException { | |
int iodepth = 128; | |
CountDownLatch completionLatch = new CountDownLatch(iodepth * numJobs); | |
for (int jobIndex = 0; jobIndex < numJobs; jobIndex++) { | |
Reactor reactor = reactors.get(jobIndex); | |
reactor.offer(() -> { | |
String path = randomTmpFile(drive); | |
System.out.println("Creating file " + path); | |
fileList.add(path); | |
AsyncFile file = reactor.eventloop().newAsyncFile(path); | |
int openFlags = O_RDWR | O_NOATIME | O_DIRECT | O_CREAT; | |
long startMs = System.currentTimeMillis(); | |
file.open(openFlags, PERMISSIONS_ALL).then((integer, throwable) -> { | |
if (throwable != null) { | |
throwable.printStackTrace(); | |
System.exit(1); | |
} | |
AtomicInteger completed = new AtomicInteger(iodepth); | |
for (int k = 0; k < iodepth; k++) { | |
InitFileTask initFileTask = new InitFileTask(k); | |
initFileTask.startMs = startMs; | |
initFileTask.completionLatch = completionLatch; | |
initFileTask.completed = completed; | |
initFileTask.file = file; | |
reactor.offer(initFileTask); | |
} | |
}); | |
}); | |
} | |
completionLatch.await(); | |
} | |
private void teardown() { | |
long startMs = System.currentTimeMillis(); | |
System.out.println("Teardown: starting"); | |
try { | |
for (Reactor reactor : reactors) { | |
reactor.shutdown(); | |
} | |
for (Reactor reactor : reactors) { | |
if (!reactor.awaitTermination(5, TimeUnit.SECONDS)) { | |
throw new RuntimeException("Reactor " + reactor + " failed to terminate"); | |
} | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
if (deleteFilesOnExit) { | |
for (String path : fileList) { | |
System.out.println("Deleting " + path); | |
if (!new File(path).delete()) { | |
System.out.println("Failed to delete " + path); | |
} | |
} | |
} | |
long durationMs = System.currentTimeMillis() - startMs; | |
System.out.println("Teardown: done [duration=" + durationMs + " ms]"); | |
} | |
private void printResults(long durationMs) { | |
DecimalFormat longFormat = new DecimalFormat("#,###"); | |
DecimalFormat doubleFormat = new DecimalFormat("#,###.00"); | |
System.out.println("Duration: " + longFormat.format(durationMs) + " ms"); | |
System.out.println("Reactors: " + numJobs); | |
System.out.println("I/O depth: " + iodepth); | |
System.out.println("Direct I/O: " + direct); | |
System.out.println("Page size: " + pageSize() + " B"); | |
System.out.println("Operations: " + longFormat.format(operationCount)); | |
System.out.println("fsync: " + fsync); | |
System.out.println("fdatasync: " + fdatasync); | |
System.out.println("Speed: " + doubleFormat.format(operationCount * 1000f / durationMs) + " IOPS"); | |
switch (workload) { | |
case WORKLOAD_NOP: | |
System.out.println("Workload: nop"); | |
break; | |
case WORKLOAD_WRITE: | |
System.out.println("Workload: sequential write"); | |
break; | |
case WORKLOAD_READ: | |
System.out.println("Workload: sequential read"); | |
break; | |
case WORKLOAD_RANDOM_WRITE: | |
System.out.println("Workload: random write"); | |
break; | |
case WORKLOAD_RANDOM_READ: | |
System.out.println("Workload: random read"); | |
break; | |
default: | |
System.out.println("Workload: unknown"); | |
} | |
System.out.println("File size: " + longFormat.format(fileSize) + " B"); | |
System.out.println("Block size: " + longFormat.format(bs) + " B"); | |
long dataSize = bs * operationCount; | |
if (workload != WORKLOAD_NOP) { | |
System.out.println("Read/Written: " + doubleFormat.format((dataSize * 1.0f) / (1024 * 1024 * 1024)) + " GiB"); | |
System.out.println("Bandwidth: " + doubleFormat.format(dataSize * 1000f / (durationMs * 1024 * 1024)) + " MiB/s"); | |
} | |
} | |
private class MonitorThread extends Thread { | |
private boolean shutdown; | |
private void shutdown() { | |
shutdown = true; | |
interrupt(); | |
} | |
@Override | |
public void run() { | |
long lastTimeMs = System.currentTimeMillis(); | |
StringBuffer sb = new StringBuffer(); | |
DecimalFormat doubleFormat = new DecimalFormat("#,###.00"); | |
DecimalFormat longFormat = new DecimalFormat("#,###"); | |
LastAsyncFileMetrics[] metricsArray = new LastAsyncFileMetrics[asyncFiles.size()]; | |
for (int k = 0; k < metricsArray.length; k++) { | |
metricsArray[k] = new LastAsyncFileMetrics(); | |
} | |
while (!shutdown) { | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
break; | |
} | |
long nowMs = System.currentTimeMillis(); | |
for (int k = 0; k < metricsArray.length; k++) { | |
AsyncFile file = asyncFiles.get(k); | |
AsyncFileMetrics metrics = file.getMetrics(); | |
LastAsyncFileMetrics lastMetrics = metricsArray[k]; | |
long fsyncs = metrics.fsyncs(); | |
if (fsyncs > 0) { | |
double perf = ((fsyncs - lastMetrics.fsyncs) * 1000f) / (nowMs - lastTimeMs); | |
lastMetrics.fsyncs = fsyncs; | |
sb.append(" fsyncs/s: ").append(longFormat.format(perf)); | |
} | |
long fdatasyncs = metrics.fdatasyncs(); | |
if (fdatasyncs > 0) { | |
double perf = ((fdatasyncs - lastMetrics.fdatasyncs) * 1000f) / (nowMs - lastTimeMs); | |
lastMetrics.fdatasyncs = fdatasyncs; | |
sb.append(" fdatasyncs/s: ").append(longFormat.format(perf)); | |
} | |
long writes = metrics.writes(); | |
if (writes > 0) { | |
double perf = ((writes - lastMetrics.writes) * 1000f) / (nowMs - lastTimeMs); | |
lastMetrics.writes = writes; | |
sb.append(" writes/s: ").append(longFormat.format(perf)); | |
} | |
long bytesWritten = metrics.bytesWritten(); | |
if (bytesWritten > 0) { | |
double perf = ((bytesWritten - lastMetrics.bytesWritten) * 1000f) / (nowMs - lastTimeMs); | |
lastMetrics.bytesWritten = bytesWritten; | |
sb.append(" write-bytes/s: ").append(doubleFormat.format(perf)); | |
} | |
long reads = metrics.reads(); | |
if (reads > 0) { | |
double perf = ((reads - lastMetrics.reads) * 1000f) / (nowMs - lastTimeMs); | |
lastMetrics.reads = reads; | |
sb.append(" read/s: ").append(longFormat.format(perf)); | |
} | |
long bytesRead = metrics.bytesRead(); | |
if (bytesRead > 0) { | |
double perf = ((bytesRead - lastMetrics.bytesRead) * 1000f) / (nowMs - lastTimeMs); | |
lastMetrics.bytesRead = bytesRead; | |
sb.append(" read-bytes/s: ").append(doubleFormat.format(perf)); | |
} | |
System.out.println(sb); | |
sb.setLength(0); | |
} | |
lastTimeMs = nowMs; | |
} | |
} | |
} | |
private static class LastAsyncFileMetrics { | |
private long reads; | |
private long writes; | |
private long fsyncs; | |
private long fdatasyncs; | |
private long bytesRead; | |
private long bytesWritten; | |
} | |
private class IOTask implements Runnable, BiConsumer<Integer, Throwable> { | |
private final long startBlock; | |
private final long endBlock; | |
private long operation; | |
private AsyncFile file; | |
private long operationCount; | |
private CountDownLatch completionLatch; | |
private long block; | |
private final long bufferAddress; | |
private final ByteBuffer buffer; | |
private final Random random = new Random(); | |
private final int blockSize; | |
private final long blockCount; | |
private final int workload; | |
private int syncCounter; | |
private int syncInterval; | |
private boolean fdatasync; | |
public IOTask(int ioTaskIndex) { | |
this.workload = StorageBenchmark.this.workload; | |
if (StorageBenchmark.this.fsync > 0) { | |
this.syncInterval = StorageBenchmark.this.fsync; | |
this.fdatasync = false; | |
} else if (StorageBenchmark.this.fdatasync > 0) { | |
this.syncInterval = StorageBenchmark.this.fdatasync; | |
this.fdatasync = true; | |
} else { | |
this.syncInterval = 0; | |
} | |
// Setting up the buffer | |
this.buffer = allocateDirect(bs, pageSize()); | |
this.bufferAddress = BufferUtil.addressOf(buffer); | |
for (int c = 0; c < buffer.capacity() / 2; c++) { | |
buffer.putChar('c'); | |
} | |
this.blockSize = StorageBenchmark.this.bs; | |
long fileBlockCount = StorageBenchmark.this.fileSize / StorageBenchmark.this.bs; | |
this.blockCount = fileBlockCount / StorageBenchmark.this.iodepth; | |
this.startBlock = (fileBlockCount * ioTaskIndex) / StorageBenchmark.this.iodepth; | |
this.block = startBlock; | |
this.endBlock = startBlock + blockCount; | |
} | |
@Override | |
public void run() { | |
Promise<Integer> p; | |
switch (workload) { | |
case WORKLOAD_NOP: { | |
p = file.nop(); | |
} | |
break; | |
case WORKLOAD_WRITE: { | |
if (syncInterval > 0) { | |
if (syncCounter == 0) { | |
syncCounter = syncInterval; | |
p = fdatasync ? file.fdatasync() : file.fsync(); | |
break; | |
} else { | |
syncCounter--; | |
} | |
} | |
long offset = block * blockSize; | |
block++; | |
if (block > endBlock) { | |
block = startBlock; | |
} | |
p = file.pwrite(offset, blockSize, bufferAddress); | |
} | |
break; | |
case WORKLOAD_READ: { | |
long offset = block * blockSize; | |
block++; | |
if (block > endBlock) { | |
block = startBlock; | |
} | |
p = file.pread(offset, blockSize, bufferAddress); | |
} | |
break; | |
case WORKLOAD_RANDOM_WRITE: { | |
if (syncInterval > 0) { | |
if (syncCounter == 0) { | |
syncCounter = syncInterval; | |
p = fdatasync ? file.fdatasync() : file.fsync(); | |
break; | |
} else { | |
syncCounter--; | |
} | |
} | |
long nextBlock = startBlock + nextLong(random, blockCount); | |
long offset = nextBlock * blockSize; | |
p = file.pwrite(offset, blockSize, bufferAddress); | |
} | |
break; | |
case WORKLOAD_RANDOM_READ: { | |
long nextBlock = startBlock + nextLong(random, blockCount); | |
long offset = nextBlock * blockSize; | |
p = file.pread(offset, blockSize, bufferAddress); | |
} | |
break; | |
default: | |
throw new RuntimeException("Unknown workload"); | |
} | |
p.then(this).releaseOnComplete(); | |
} | |
@Override | |
public void accept(Integer result, Throwable throwable) { | |
if (throwable != null) { | |
throwable.printStackTrace(); | |
System.exit(1); | |
} | |
if (operation >= operationCount) { | |
completionLatch.countDown(); | |
return; | |
} | |
operation++; | |
run(); | |
} | |
} | |
private class InitFileTask implements Runnable, BiConsumer<Integer, Throwable> { | |
private final long startBlock; | |
private final long endBlock; | |
public AtomicInteger completed; | |
public long startMs; | |
private AsyncFile file; | |
private CountDownLatch completionLatch; | |
private long block; | |
private final long bufferAddress; | |
private final ByteBuffer buffer; | |
private final int blockSize; | |
private final long blockCount; | |
public InitFileTask(int ioTaskIndex) { | |
// Setting up the buffer | |
this.buffer = allocateDirect(bs, pageSize()); | |
this.bufferAddress = BufferUtil.addressOf(buffer); | |
for (int c = 0; c < buffer.capacity() / 2; c++) { | |
buffer.putChar('c'); | |
} | |
this.blockSize = StorageBenchmark.this.bs; | |
long fileBlockCount = StorageBenchmark.this.fileSize / StorageBenchmark.this.bs; | |
this.blockCount = fileBlockCount / StorageBenchmark.this.iodepth; | |
this.startBlock = (fileBlockCount * ioTaskIndex) / StorageBenchmark.this.iodepth; | |
this.block = startBlock; | |
this.endBlock = startBlock + blockCount; | |
} | |
@Override | |
public void run() { | |
long offset = block * blockSize; | |
block++; | |
Promise<Integer> p = file.pwrite(offset, blockSize, bufferAddress); | |
p.then(this).releaseOnComplete(); | |
} | |
@Override | |
public void accept(Integer result, Throwable throwable) { | |
if (throwable != null) { | |
throwable.printStackTrace(); | |
System.exit(1); | |
} | |
if (block == endBlock) { | |
if (completed.decrementAndGet() == 0) { | |
file.close().then((integer, throwable1) -> { | |
if (throwable1 != null) { | |
throwable1.printStackTrace(); | |
System.exit(1); | |
} | |
long durationMs = System.currentTimeMillis() - startMs; | |
System.out.println("Creating file " + file.path() + " completed in " + durationMs + " ms"); | |
completionLatch.countDown(); | |
}); | |
} else { | |
completionLatch.countDown(); | |
} | |
return; | |
} | |
run(); | |
} | |
} | |
private static long nextLong(Random random, long bound) { | |
if (bound <= 0) { | |
throw new IllegalArgumentException("bound must be positive"); | |
} | |
long l = random.nextLong(); | |
if (l == Long.MIN_VALUE) { | |
return 0; | |
} else if (l < 0) { | |
l = -l; | |
} | |
return l % bound; | |
} | |
private static String randomTmpFile(String dir) { | |
String uuid = UUID.randomUUID().toString().replace("-", ""); | |
String separator = FileSystems.getDefault().getSeparator(); | |
return dir + separator + uuid; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment