Skip to content

Instantly share code, notes, and snippets.

@jsen-
Created August 19, 2023 13:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsen-/0531c513ccace0bc1d6a6f12c0d1278e to your computer and use it in GitHub Desktop.
Save jsen-/0531c513ccace0bc1d6a6f12c0d1278e to your computer and use it in GitHub Desktop.
[package]
name = "deno_playground"
version = "0.1.0"
edition = "2021"
[dependencies]
deno_core = "0.202.0"
slotmap = { version = "1.0.6", path = "deps/slotmap" }
tokio = {version="1.31.0", default-features = false}
use deno_core::{
error::AnyError, op, Extension, JsRealm, JsRuntime, ModuleCode, ModuleId, ModuleSpecifier, Op,
};
use slotmap::{DefaultKey, HopSlotMap};
use std::{
fmt,
future::{poll_fn, Future},
mem,
path::Path,
pin::Pin,
task::Poll,
time::Duration,
};
use tokio::sync::{mpsc, oneshot};
#[op]
async fn op_sleep_ms(ms: u64) {
tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
}
#[op]
async fn op_sleep_infinity() {
poll_fn(|_| Poll::Pending).await
}
#[derive(Debug)]
struct Requestor {
tx: Send,
}
impl Requestor {
pub async fn run<P: AsRef<Path>>(&self, name: P, code: ModuleCode) -> Result<(), AnyError> {
let (resp_tx, resp_rx) = oneshot::channel();
self.tx
.send(Req::Exec(
resp_tx,
ModuleSpecifier::from_directory_path(name).unwrap(),
code,
))
.await?;
resp_rx.await??;
Ok(())
}
}
type ModuleResult =
deno_core::futures::channel::oneshot::Receiver<Result<(), deno_core::anyhow::Error>>;
type Resp = oneshot::Sender<Result<(), AnyError>>;
type Recv = mpsc::Receiver<Req>;
type Send = mpsc::Sender<Req>;
enum Req {
Exec(Resp, ModuleSpecifier, ModuleCode),
}
struct Normal {
runtime: JsRuntime,
rx: Recv,
finished: bool,
receivers: HopSlotMap<DefaultKey, (JsRealm, Resp, ModuleResult)>, // TODO: Vec might be sufficient
}
enum Poller {
ModuleLoad(
Pin<Box<dyn Future<Output = (Normal, Resp, Result<(JsRealm, ModuleId), AnyError>)>>>,
),
Normal(Normal),
Tmp,
}
impl fmt::Debug for Poller {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ModuleLoad(_) => f.debug_struct("Poller::ModuleLoad").finish(),
Self::Normal(Normal { finished, .. }) => f
.debug_struct("Poller::Normal")
.field("finished", finished)
.finish(),
Self::Tmp => f.debug_struct("Poller::Tmp").finish(),
}
}
}
impl Poller {
pub async fn new() -> (Self, Requestor) {
let extensions = Extension {
ops: std::borrow::Cow::Borrowed(&[op_sleep_ms::DECL, op_sleep_infinity::DECL]),
..Default::default()
};
let mut runtime = deno_core::JsRuntime::new(deno_core::RuntimeOptions {
extensions: vec![extensions],
..Default::default()
});
let mod_id = runtime
.load_main_module(
&ModuleSpecifier::from_file_path("/keepalive.js").unwrap(),
Some(ModuleCode::Static("Deno.core.ops.op_sleep_infinity()")),
)
.await
.expect("failed to execute keepalive");
runtime.mod_evaluate(mod_id).close();
let (tx, rx) = tokio::sync::mpsc::channel(150);
(
Self::Normal(Normal {
runtime,
rx,
finished: false,
receivers: Default::default(),
}),
Requestor { tx },
)
}
}
impl Future for Poller {
type Output = Result<(), AnyError>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let self_ref = self.get_mut();
match self_ref {
Self::ModuleLoad(fut) => match Pin::new(fut).poll(cx) {
Poll::Pending => {
cx.waker().wake_by_ref();
return Poll::Pending;
}
Poll::Ready((mut normal, resp, module_load_result)) => {
// module loaded
match module_load_result {
Err(err) => {
resp.send(Err(err)).unwrap();
}
Ok((realm, mod_id)) => {
let result = realm.mod_evaluate(normal.runtime.v8_isolate(), mod_id);
normal.receivers.insert((realm, resp, result));
}
}
let Self::ModuleLoad(_fut) = mem::replace(self_ref, Self::Normal(normal))
else {
panic!()
};
cx.waker().wake_by_ref();
return Poll::Pending;
}
},
Self::Normal(normal) => {
// poll all script result receivers in flight and remove the finished ones
// `extract_mapped` added in my `slotmap` fork :|
let drain_iter = normal.receivers.extract_mapped(|_, (_, _, mod_res)| {
match Pin::new(mod_res).poll(cx) {
Poll::Pending => None,
Poll::Ready(mod_res) => {
let r = match mod_res {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err /* Canceled */) => Err(err.into()),
};
Some(r)
}
}
});
for (_, (_, resp, _), r) in drain_iter {
resp.send(r).ok(); // TODO: do we care whether it was delivered?
}
if normal.finished {
// just poll the runtime
} else {
match Pin::new(&mut normal.rx).poll_recv(cx) {
Poll::Pending => {
// didn't receive anything, just poll the runtime
}
Poll::Ready(None) => {
// the other end hung up, so stop polling rx and poll the runtime
normal.finished = true;
}
Poll::Ready(Some(Req::Exec(resp, name, code))) => {
// received a script to execute
let Self::Normal(mut normal) = mem::replace(self_ref, Self::Tmp) else {
panic!();
};
let module_load = Box::pin(async move {
let realm =
normal.runtime.create_realm(Default::default()).unwrap();
// realm
// .execute_script(
// normal.runtime.v8_isolate(),
// "[runtime.js]",
// ModuleCode::Static(include_str!("runtime.js")),
// )
// .expect("failed to execute [runtime.js]");
let module_load_result = realm
.load_main_module(
normal.runtime.v8_isolate(),
&name,
Some(code),
)
.await;
(
normal,
resp,
module_load_result.map(|module_id| (realm, module_id)),
)
});
let Self::Tmp = mem::replace(self_ref, Self::ModuleLoad(module_load))
else {
panic!()
};
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
}
}
Self::Tmp => panic!(),
};
let Self::Normal(normal) = self_ref else {
panic!();
};
normal.runtime.poll_event_loop(cx, false)
}
}
async fn real_main() -> Result<(), AnyError> {
let (poller, req) = Poller::new().await;
tokio::spawn(async move {
for i in 0..2 {
let x = req.run(
"/x.js",
ModuleCode::Owned(format!("Deno.core.print('x.js {i}\\n')").into_boxed_str()),
);
let y = req.run(
"/y.js",
ModuleCode::Owned(format!("Deno.core.print('y.js {i}\\n')").into_boxed_str()),
);
let (x_res, y_res) = tokio::join!(x, y);
match x_res {
Ok(()) => println!("x.js Ok"),
Err(err) => println!("x.js err: {err}"),
}
match y_res {
Ok(()) => println!("y.js Ok"),
Err(err) => println!("y.js err: {err}"),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
poller.await
}
fn main() {
if let Err(err) = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap()
.block_on(real_main())
{
eprintln!("main failed: {err}");
std::process::exit(1);
}
}
@paomian
Copy link

paomian commented Sep 13, 2023

https://gist.github.com/jsen-/0531c513ccace0bc1d6a6f12c0d1278e#file-main-rs-L145
extract_mapped will replace resp sender to a oneshot::channel().0 and get owner ship of resp?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment