Last active
October 21, 2020 19:50
-
-
Save sherman/ff4168a2fd3ef4b096ec8495dfa70c6b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.sherman.iouring.core; | |
import java.io.Closeable; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Queue; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import com.google.common.base.Preconditions; | |
import com.google.common.util.concurrent.ThreadFactoryBuilder; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.ByteBufAllocator; | |
import io.netty.buffer.UnpooledByteBufAllocator; | |
import org.sherman.iouring.util.OnResult; | |
import org.sherman.iouring.util.ReadTask; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class EventLoop implements Closeable { | |
private static final Logger logger = LoggerFactory.getLogger(EventLoop.class); | |
private final AtomicBoolean terminated = new AtomicBoolean(false); | |
private final AtomicBoolean running = new AtomicBoolean(false); | |
private final RingBuffer ringBuffer; | |
private final IoUringSubmissionQueue submissionQueue; | |
private final IoUringCompletionQueue completionQueue; | |
private Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>(); | |
private AtomicInteger inflight = new AtomicInteger(); | |
private ConcurrentMap<Integer, DataHandler> fdToHandlers = new ConcurrentHashMap<>(); | |
private ByteBufAllocator allocator = new UnpooledByteBufAllocator(true); | |
private final ExecutorService executorService = Executors.newFixedThreadPool( | |
1, | |
new ThreadFactoryBuilder() | |
.setNameFormat("EventLoop-%d") | |
.build() | |
); | |
private final ExecutorService callBackExecutorService = Executors.newFixedThreadPool( | |
2, | |
new ThreadFactoryBuilder() | |
.setNameFormat("Callback-%d") | |
.build() | |
); | |
private final static int MAX_OPERATIONS = 8912; | |
public EventLoop() { | |
ringBuffer = Native.create(64); | |
submissionQueue = ringBuffer.getSubmissionQueue(); | |
completionQueue = ringBuffer.getCompletionQueue(); | |
executorService.submit( | |
new Runnable() { | |
@Override | |
public void run() { | |
try { | |
running.set(true); | |
runLoop(); | |
} catch (Exception e) { | |
logger.info("Error in a loop", e); | |
} finally { | |
running.set(false); | |
} | |
} | |
} | |
); | |
} | |
public void runLoop() { | |
while (!terminated.get()) { | |
if (completionQueue.hasCompletions()) { | |
completionQueue.process(new UserDataCallback() { | |
@Override | |
public void handle(int fd, int res, int flags, int op, short data) { | |
callBackExecutorService.submit( | |
() -> { | |
try { | |
DataHandler dataHandler = fdToHandlers.get(fd); | |
if (dataHandler == null) { | |
throw new IllegalArgumentException(String.format("Data handler for fd: %d is not found", fd)); | |
} | |
ByteBuf buffer = dataHandler.buffers.get(data); | |
buffer.writerIndex(res); | |
try { | |
OnResult callback = dataHandler.callbacks.get(data); | |
callback.onRead(buffer); | |
} finally { | |
buffer.release(); | |
inflight.decrementAndGet(); | |
} | |
} catch (Exception e) { | |
logger.error("Error, while handling data", e); | |
} | |
} | |
); | |
} | |
}); | |
} | |
int added = 0; | |
while (!taskQueue.isEmpty() && inflight.get() < 16) { | |
Runnable task = taskQueue.poll(); | |
if (task != null) { | |
task.run(); | |
added++; | |
} | |
} | |
if (added > 0) { | |
int submitted = submissionQueue.submit(); | |
//logger.info("Submitted: [{}]", submitted); | |
Preconditions.checkArgument(submitted > 0, "Can't submit tasks"); | |
} | |
//logger.info("Submit"); | |
} | |
} | |
public void addTask(Runnable task) { | |
if (!taskQueue.add(task)) { | |
logger.error("Can't add task"); | |
} | |
} | |
public void addReadTask(ReadTask task) { | |
addTask(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
DataHandler data = fdToHandlers.computeIfAbsent(task.getFileDescriptor(), ignored -> new DataHandler()); | |
if (data.id == MAX_OPERATIONS) { | |
throw new IllegalArgumentException("Too many operation for a single fd"); | |
} | |
ByteBuf byteBuffer = allocator.buffer(task.getLength()); | |
data.callbacks.add(data.id, task.getHandler()); | |
data.buffers.add(data.id, byteBuffer); | |
boolean added = submissionQueue.addRead( | |
task.getFileDescriptor(), | |
byteBuffer.memoryAddress(), | |
(int) task.getOffset(), | |
task.getLength(), | |
data.id | |
); | |
Preconditions.checkArgument(added, "Read event was not added!"); | |
//logger.info("{}", data.id); | |
data.id++; | |
inflight.incrementAndGet(); | |
} catch (Exception e) { | |
logger.error("Can't add task", e); | |
} | |
} | |
}); | |
} | |
@Override | |
public void close() throws IOException { | |
terminated.set(true); | |
while (running.get()) { | |
} | |
ringBuffer.close(); | |
executorService.shutdown(); | |
callBackExecutorService.shutdown(); | |
} | |
private static class DataHandler { | |
private short id; | |
private final List<OnResult> callbacks = new ArrayList<>(); | |
private final List<ByteBuf> buffers = new ArrayList<>(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment