Skip to content

Instantly share code, notes, and snippets.

@Sherlock-Holo
Created May 5, 2020 01:05
Show Gist options
  • Save Sherlock-Holo/6877d3f9e2271fb290326b4f55b13ec8 to your computer and use it in GitHub Desktop.
Save Sherlock-Holo/6877d3f9e2271fb290326b4f55b13ec8 to your computer and use it in GitHub Desktop.
io_uring test
use std::collections::HashMap;
use std::fs::File;
use std::future::Future;
use std::io::Error;
use std::io::IoSliceMut;
use std::io::Result;
use std::marker::PhantomData;
use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
use std::pin::Pin;
use std::sync::{Arc, Mutex, Once};
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll};
use std::task::Waker;
use std::thread;
use anyhow::Context as _;
use io_uring::concurrent::{CompletionQueue, IoUring, SubmissionQueue};
use io_uring::opcode::{AsyncCancel, Read};
use io_uring::opcode::types::Target;
use once_cell::sync::OnceCell;
fn get_user_data() -> u64 {
static USER_DATA_GEN: AtomicU64 = AtomicU64::new(0);
USER_DATA_GEN.fetch_add(1, Ordering::Relaxed)
}
fn get_ring() -> &'static IoUring {
static RING: OnceCell<IoUring> = OnceCell::new();
RING.get_or_init(|| {
io_uring::IoUring::new(4096).unwrap().concurrent()
})
}
fn get_sq() -> SubmissionQueue<'static> {
get_ring().submission()
}
fn get_cq() -> CompletionQueue<'static> {
get_ring().completion()
}
fn get_result_map() -> &'static Mutex<HashMap<u64, IOUringResult>> {
static MAP: OnceCell<Mutex<HashMap<u64, IOUringResult>>> = OnceCell::new();
MAP.get_or_init(|| {
Mutex::new(HashMap::new())
})
}
fn run_ring() {
let ring = get_ring();
let completion_queue = get_cq();
loop {
let size = ring.submit_and_wait(1).context("submit and wait failed").unwrap();
let mut map_guard = get_result_map().lock().unwrap();
while let Some(entry) = completion_queue.pop() {
let user_data = entry.user_data();
let result = if entry.result() >= 0 {
Ok(entry.result() as usize)
} else {
Err(Error::last_os_error())
};
if let Some(share_result) = map_guard.remove(&user_data) {
let mut guard = share_result.lock().unwrap();
guard.1.replace(result);
guard.0.take().unwrap().wake();
}
}
}
}
type IOUringResult = Arc<Mutex<(Option<Waker>, Option<Result<usize>>)>>;
struct ReadFuture<'a> {
fd: RawFd,
_marker: PhantomData<&'a ()>,
buf: IoSliceMut<'a>,
user_data: u64,
is_submit: bool,
result: IOUringResult,
}
impl<'a> ReadFuture<'a> {
fn new(file: &'a File, buf: &'a mut [u8]) -> Self {
static RING_THREAD: Once = Once::new();
RING_THREAD.call_once(|| {
thread::spawn(run_ring);
});
let user_data = get_user_data();
let result = Arc::new(Mutex::new((None, None)));
Self {
fd: file.as_raw_fd(),
_marker: Default::default(),
buf: IoSliceMut::new(buf),
user_data,
is_submit: false,
result,
}
}
}
impl Drop for ReadFuture<'_> {
fn drop(&mut self) {
if !self.is_submit {
return;
}
// means ring has wakeup this future
if self.result.lock().unwrap().0.is_none() {
return;
}
let mut cancel_entry = AsyncCancel::new(self.user_data).build();
let submission_queue = get_sq();
unsafe {
while let Err(entry) = submission_queue.push(cancel_entry) {
cancel_entry = entry;
}
}
}
}
impl Future for ReadFuture<'_> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.is_submit {
if let Ok(mut guard) = self.result.try_lock() {
if let Some(result) = guard.1.take() {
return Poll::Ready(result);
}
}
return Poll::Pending;
}
let entry = Read::new(Target::Fd(self.fd), self.buf.as_mut_ptr(), self.buf.len() as _)
.build()
.user_data(self.user_data);
let submission_queue = get_sq();
unsafe {
if submission_queue.push(entry).is_err() {
return Poll::Pending;
}
}
// Safety: event haven't register
self.result.try_lock().unwrap().0.replace(cx.waker().clone());
get_result_map().lock().unwrap().insert(self.user_data, self.result.clone());
self.is_submit = true;
Poll::Pending
}
}
trait UringRead {
fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a>;
}
impl UringRead for File {
fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a> {
ReadFuture::new(self, buf)
}
}
#[async_std::main]
async fn main() {
let file = File::open("/tmp/test.txt").unwrap();
let mut buf = vec![0; 1024];
let n = file.read(&mut buf).await.unwrap();
println!("{}", String::from_utf8_lossy(&buf[..n]));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment