Skip to content

Instantly share code, notes, and snippets.

@sxlijin
Created June 7, 2024 05:27
Show Gist options
  • Save sxlijin/6967045e0b8d19b34340db43f369c257 to your computer and use it in GitHub Desktop.
Save sxlijin/6967045e0b8d19b34340db43f369c257 to your computer and use it in GitHub Desktop.
ruby-tokio-demo
use futures::{future, FutureExt};
use magnus::{
class, exception::runtime_error, function, method, prelude::*, Error, IntoValue, RModule,
};
//#[cfg(ruby_have_ruby_fiber_scheduler_h)]
use rb_sys::bindings::uncategorized::{
rb_fiber_current, rb_fiber_scheduler_block, rb_fiber_scheduler_current, rb_fiber_scheduler_get,
rb_fiber_scheduler_kernel_sleep, rb_fiber_scheduler_unblock,
};
use std::cell::RefCell;
use std::ops::{Deref, DerefMut};
//use rb_sys::rb_fiber_current;
use std::task::Poll;
use std::{future::Future, sync::Arc};
use tokio::time::{sleep, Duration};
use crate::Result;
async fn async_fn() -> String {
let duration = Duration::from_secs(2);
println!("async-BEGIN- sleeping for {duration:#?}");
sleep(duration).await;
println!("async-END- slept for {duration:#?}");
"async-retval".to_string()
}
fn wrap_fiber<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
let mut f = Box::pin(f);
future::poll_fn(move |cx| {
let result = f.as_mut().poll(cx);
let fiber = unsafe { rb_fiber_current() };
println!("fiber={fiber}");
match result {
Poll::Ready(_) => unsafe {
//rb_fiber_scheduler_unblock(
// rb_fiber_scheduler_current(),
// rb_sys::special_consts::Qnil.into(),
// fiber,
//);
},
Poll::Pending => unsafe {
rb_fiber_scheduler_block(
rb_fiber_scheduler_current(),
rb_sys::special_consts::Qnil.into(),
rb_sys::special_consts::Qnil.into(),
);
},
}
result
})
}
#[magnus::wrap(class = "Baml::Ffi::TokioDemo", free_immediately, size)]
/// For testing how to implement tokio in a Ruby extension
pub struct TokioDemo {
t: RefCell<Option<tokio::runtime::Runtime>>,
}
#[allow(dead_code)]
impl TokioDemo {
fn new() -> Result<Self> {
let Ok(tokio_runtime) = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
else {
return Err(Error::new(runtime_error(), "Failed to start tokio runtime"));
};
Ok(Self {
t: RefCell::new(Some(tokio_runtime)),
})
}
//fn does_this_yield(&self) {
// let rb_qnil = rb_sys::special_consts::Qnil;
// println!("build2 qnil {}", Into::<u64>::into(rb_qnil));
// let rb_scheduler = unsafe { rb_fiber_scheduler_get() };
// println!("current scheduler {}", Into::<u64>::into(rb_scheduler));
// let rb_curr_fiber = unsafe { rb_fiber_current() };
// println!(
// " fiber={} going to sleep",
// Into::<u64>::into(rb_curr_fiber)
// );
// unsafe {
// let rb_duration = magnus::Integer::from_i64(10).into_value();
// //rb_fiber_scheduler_kernel_sleep(rb_scheduler, rb_duration.as_raw());
// }
// let fut = self.t.spawn(async move {
// async_fn().await;
// println!(" fiber={} done sleeping, pls wake up", rb_curr_fiber);
// unsafe {
// let rb_qnil = rb_sys::special_consts::Qnil;
// if rb_scheduler != Into::<u64>::into(rb_qnil) {
// rb_fiber_scheduler_unblock(rb_scheduler, rb_qnil.into(), rb_curr_fiber);
// }
// }
// });
// println!(
// " fiber={} signalling that we're going to block",
// Into::<u64>::into(rb_curr_fiber)
// );
// unsafe {
// if rb_scheduler != Into::<u64>::into(rb_qnil) {
// rb_fiber_scheduler_block(
// rb_scheduler,
// rb_qnil.into(),
// // In theory, according to rb_fiber_scheduler_make_timeout, qnil blocks indefinitely
// /*timeout:*/
// rb_qnil.into(),
// );
// }
// }
// println!(
// " fiber={} blocking until woken up",
// Into::<u64>::into(rb_curr_fiber)
// );
// self.t.block_on(fut);
//}
fn wrapped_yield(&self) {
let borrow = self.t.borrow();
let Some(ref t) = borrow.deref() else {
println!("no runtime found");
return;
};
let (tx, rx) = tokio::sync::oneshot::channel();
let rb_curr_fiber = unsafe { rb_fiber_current() };
let rb_curr_scheduler = unsafe { rb_fiber_scheduler_current() };
t.spawn(async move {
async_fn().await;
if let Err(_) = tx.send(3) {
println!("the receiver dropped");
}
unsafe {
rb_fiber_scheduler_unblock(
rb_curr_scheduler,
rb_sys::special_consts::Qnil.into(),
rb_curr_fiber,
);
}
});
drop(borrow);
//let mut fut = std::pin::pin!(async_fn());
let mut fut = std::pin::pin!(rx);
let waker = unsafe { std::task::Waker::from(Arc::new(RubyWaker {})) };
let mut cx = std::task::Context::from_waker(&waker);
loop {
match Future::poll(fut.as_mut(), &mut cx) {
Poll::Pending => {
println!(" fiber={rb_curr_fiber} pending");
unsafe {
rb_fiber_scheduler_block(
rb_curr_scheduler,
rb_sys::special_consts::Qnil.into(),
rb_sys::special_consts::Qnil.into(),
);
}
}
Poll::Ready(_) => {
println!(" fiber={rb_curr_fiber} ready");
break;
}
}
}
}
fn shutdown(&self) {
let mut shutdown = None;
std::mem::swap(self.t.borrow_mut().deref_mut(), &mut shutdown);
if let Some(t) = shutdown {
t.shutdown_background();
}
}
#[allow(unused_variables)]
fn tokio_test(&self) {
//let f0 = self.t.spawn(async_fn());
//let f1 = self.t.spawn(async_fn());
//let f2 = self.t.spawn(async_fn());
}
/// For usage in magnus::init
///
/// TODO: use traits and macros to implement this
pub fn define_in_ruby(module: &RModule) -> Result<()> {
let tokio_demo = module.define_class("TokioDemo", class::object())?;
tokio_demo.define_singleton_method("new", function!(TokioDemo::new, 0))?;
//tokio_demo.define_method("does_this_yield", method!(TokioDemo::does_this_yield, 0))?;
tokio_demo.define_method("does_this_yield", method!(TokioDemo::wrapped_yield, 0))?;
tokio_demo.define_method("shutdown", method!(TokioDemo::shutdown, 0))?;
Ok(())
}
}
struct RubyWaker {}
impl std::task::Wake for RubyWaker {
fn wake(self: Arc<Self>) {
println!("Waking up Ruby fiber");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment