Created
October 21, 2020 19:29
-
-
Save sherman/d37c9bf430801b1e3ccae24652498697 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.sherman.iouring.util; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.RandomAccessFile; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Consumer; | |
import com.google.common.base.Preconditions; | |
import io.netty.buffer.ByteBuf; | |
import org.sherman.iouring.core.EventLoop; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class IoUringFileReader implements AutoCloseable { | |
private static final Logger logger = LoggerFactory.getLogger(IoUringFileReader.class); | |
private final long size; | |
private final EventLoop eventLoop; | |
private final int fd; | |
private final RandomAccessFile raf; | |
private int chunks; | |
private AtomicInteger read; | |
private static final int BUFFER_SIZE = 8192 * 8; | |
private long offset; | |
private final OnResult onResult; | |
public IoUringFileReader(EventLoop eventLoop, File file, Consumer<byte[]> consumer) { | |
Preconditions.checkArgument(file.exists(), "File must be exists"); | |
this.size = file.length(); | |
this.eventLoop = eventLoop; | |
this.chunks = (int) (size / BUFFER_SIZE); | |
this.read = new AtomicInteger(chunks); | |
this.onResult = new OnResultImpl(consumer, read); | |
try { | |
this.raf = new RandomAccessFile(file, "r"); | |
this.fd = PlatformDependent.getFd(raf.getFD()); | |
} catch (IOException e) { | |
throw new IllegalArgumentException(e); | |
} | |
} | |
public void read() { | |
//logger.info("Chunks: [{}]", chunks); | |
while (chunks > 0) { | |
eventLoop.addReadTask( | |
new ReadTask( | |
fd, | |
offset, | |
BUFFER_SIZE, | |
onResult | |
) | |
); | |
offset += BUFFER_SIZE; | |
chunks = chunks - 1; | |
} | |
while (read.get() > 0) { | |
} | |
} | |
@Override | |
public void close() throws Exception { | |
raf.close(); | |
} | |
private static class OnResultImpl implements OnResult { | |
private final Consumer<byte[]> handler; | |
private AtomicInteger read; | |
private OnResultImpl(Consumer<byte[]> handler, AtomicInteger read) { | |
this.handler = handler; | |
this.read = read; | |
} | |
@Override | |
public void onRead(ByteBuf buffer) { | |
byte[] bytes = new byte[buffer.readableBytes()]; | |
int readerIndex = buffer.readerIndex(); | |
buffer.getBytes(readerIndex, bytes); | |
handler.accept(bytes); | |
read.decrementAndGet(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment