Skip to content

Instantly share code, notes, and snippets.

@sherman
Created October 21, 2020 19:29
Show Gist options
  • Save sherman/d37c9bf430801b1e3ccae24652498697 to your computer and use it in GitHub Desktop.
Save sherman/d37c9bf430801b1e3ccae24652498697 to your computer and use it in GitHub Desktop.
package org.sherman.iouring.util;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import org.sherman.iouring.core.EventLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IoUringFileReader implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(IoUringFileReader.class);
private final long size;
private final EventLoop eventLoop;
private final int fd;
private final RandomAccessFile raf;
private int chunks;
private AtomicInteger read;
private static final int BUFFER_SIZE = 8192 * 8;
private long offset;
private final OnResult onResult;
public IoUringFileReader(EventLoop eventLoop, File file, Consumer<byte[]> consumer) {
Preconditions.checkArgument(file.exists(), "File must be exists");
this.size = file.length();
this.eventLoop = eventLoop;
this.chunks = (int) (size / BUFFER_SIZE);
this.read = new AtomicInteger(chunks);
this.onResult = new OnResultImpl(consumer, read);
try {
this.raf = new RandomAccessFile(file, "r");
this.fd = PlatformDependent.getFd(raf.getFD());
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
public void read() {
//logger.info("Chunks: [{}]", chunks);
while (chunks > 0) {
eventLoop.addReadTask(
new ReadTask(
fd,
offset,
BUFFER_SIZE,
onResult
)
);
offset += BUFFER_SIZE;
chunks = chunks - 1;
}
while (read.get() > 0) {
}
}
@Override
public void close() throws Exception {
raf.close();
}
private static class OnResultImpl implements OnResult {
private final Consumer<byte[]> handler;
private AtomicInteger read;
private OnResultImpl(Consumer<byte[]> handler, AtomicInteger read) {
this.handler = handler;
this.read = read;
}
@Override
public void onRead(ByteBuf buffer) {
byte[] bytes = new byte[buffer.readableBytes()];
int readerIndex = buffer.readerIndex();
buffer.getBytes(readerIndex, bytes);
handler.accept(bytes);
read.decrementAndGet();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment