Skip to content

Instantly share code, notes, and snippets.

@vi
Created December 18, 2022 20:14
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vi/39607d1963b069a5167099f3fbffebf4 to your computer and use it in GitHub Desktop.
Save vi/39607d1963b069a5167099f3fbffebf4 to your computer and use it in GitHub Desktop.
Rust async executor abuse to avoid both blocking and threads (v2)
[package]
name = "hackyws"
version = "0.1.1"
edition = "2021"
[dependencies]
anyhow = "1.0.66"
async-tungstenite = "0.18.0"
flume = "0.10.14"
futures = "0.3.25"
futures-io = "0.3.25"
pin-project = "1.0.12"
/// Imagine this is a third-party app where our code is loaded as a plugin and we want to use async.
/// But we should not block (so `Runtime::block_on` is inaccessible) and cannot create any threads.
/// We prefer the code use only simple OS features (without epoll or other events) and instead just rely on
/// periodic polling by timer (which is itself inside the third-party app).
/// `fn main` of this demo simulates such "third-party app", everything else is the plugin code.
/// Running the code should connect to Websocket mirror, periodically send messages to it and also print each incoming message to stdout.
/// `time strace -f hackyws` shows that polling is indeed moderated by timer (not just busy loop)
/// and that there are no threads getting created (apart from "sync" threads we do create explicitly).
///
/// This is the second, more complicated version of the demo. See the first one at https://gist.github.com/vi/28117c2583ea74d35babfcd6abbef9e6
use std::{pin::Pin, time::Duration};
use async_tungstenite::tungstenite::Message;
use futures::{
task::{LocalSpawnExt, SpawnExt},
Future, SinkExt, StreamExt,
};
pub struct HackyTimerPollDemo {
exe: futures::executor::LocalPool,
wakers: timer_polled_executor_tools::TimerBasedWakers,
}
pub mod timer_polled_executor_tools {
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::pin::Pin;
use std;
use std::time::{Duration, Instant};
#[pin_project::pin_project]
pub struct MySocketWrapper(#[pin] std::net::TcpStream, TimerBasedWakers);
#[derive(Clone)]
pub struct TimerBasedWakers {
wakers: Arc<Mutex<Vec<std::task::Waker>>>,
}
impl TimerBasedWakers {
fn add<'a>(&self, ctx: &mut std::task::Context<'a>) {
self.wakers.lock().unwrap().push(ctx.waker().clone());
}
/// Wake up all accumulated wakers.
/// This method is supposed to be called periodically, prior to stepping some ticks of the executor
pub fn wake_all(&self) {
self.wakers.lock().unwrap().drain(..).for_each(|x| x.wake());
}
pub fn new() -> TimerBasedWakers {
TimerBasedWakers {
wakers: Arc::new(Mutex::new(Vec::with_capacity(2))),
}
}
}
impl MySocketWrapper {
pub fn new(s: std::net::TcpStream, wakers: TimerBasedWakers) -> std::io::Result<Self> {
s.set_nonblocking(true)?;
Ok(MySocketWrapper(s, wakers))
}
}
impl futures_io::AsyncRead for MySocketWrapper {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> Poll<futures_io::Result<usize>> {
let mut this = self.project();
match this.0.read(buf) {
Ok(x) => Poll::Ready(Ok(x)),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
this.1.add(cx);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl futures_io::AsyncWrite for MySocketWrapper {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<futures_io::Result<usize>> {
let mut this = self.project();
match this.0.write(buf) {
Ok(x) => Poll::Ready(Ok(x)),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
this.1.add(cx);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<futures_io::Result<()>> {
let mut this = self.project();
match this.0.flush() {
Ok(x) => Poll::Ready(Ok(x)),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
this.1.add(cx);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<futures_io::Result<()>> {
self.poll_flush(cx)
}
}
pub struct HackySleep(Instant, TimerBasedWakers);
impl std::future::Future for HackySleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let now = Instant::now();
if now > self.0 {
Poll::Ready(())
} else {
self.1.add(cx);
Poll::Pending
}
}
}
pub fn sleep(t: Duration, wakers: TimerBasedWakers) -> HackySleep {
HackySleep(Instant::now() + t, wakers)
}
}
impl HackyTimerPollDemo {
async fn start_my_task(
tx: flume::Sender<String>,
rx: flume::Receiver<String>,
mut spawner: impl FnMut(Pin<Box<dyn Future<Output = ()> + Send>>),
wakers: timer_polled_executor_tools::TimerBasedWakers,
) -> anyhow::Result<()> {
let s = std::net::TcpStream::connect("192.236.209.31:80")?;
let s = timer_polled_executor_tools::MySocketWrapper::new(s, wakers)?;
let (c, _) = async_tungstenite::client_async("ws://ws.vi-server.org/mirror", s).await?;
let (mut c_tx, mut c_rx) = c.split();
let subtask1 = async move {
while let Ok(msg) = rx.recv_async().await {
let _ = c_tx.send(Message::Text(msg)).await;
}
};
let subtask2 = async move {
while let Some(Ok(msg)) = c_rx.next().await {
match msg {
Message::Text(s) => {
let _ = tx.send_async(s).await;
}
_ => (),
}
}
};
spawner(Box::pin(subtask1));
spawner(Box::pin(subtask2));
Ok(())
}
pub fn new() -> anyhow::Result<(
HackyTimerPollDemo,
flume::Sender<String>,
flume::Receiver<String>,
)> {
let exe = futures::executor::LocalPool::new();
let wakers = timer_polled_executor_tools::TimerBasedWakers::new();
let (tx1, rx1) = flume::bounded(1);
let (tx2, rx2) = flume::bounded(1);
let spawner1 = exe.spawner();
let spawner2 = exe.spawner();
let spawner = move |boxed_future: Pin<Box<dyn Future<Output = ()> + Send>>| {
let _ = spawner1.spawn(boxed_future);
};
let wakers2 = wakers.clone();
let _ = spawner2.spawn_local(async move {
if let Err(e) = HackyTimerPollDemo::start_my_task(tx2, rx1, spawner, wakers2).await {
eprintln!("Error: {}", e);
}
});
Ok((HackyTimerPollDemo { exe, wakers }, tx1, rx2))
}
pub fn step(&mut self) -> anyhow::Result<()> {
self.wakers.wake_all();
self.exe.run_until_stalled();
Ok(())
}
}
/// Example of a function that passes data from sync world to async world.
fn sync_to_async(tx: flume::Sender<String>) {
let _ = tx.send("Hello".to_owned());
let mut i = 0;
loop {
std::thread::sleep(Duration::from_secs(1));
let _ = tx.send(format!("Msg {}", i));
i += 1;
}
}
/// Example of a sync function that receives data from async world
fn async_to_sync(rx: flume::Receiver<String>) {
for x in rx {
println!("{}", x);
}
}
fn main() -> anyhow::Result<()> {
let (mut demo, tx, rx) = HackyTimerPollDemo::new()?;
std::thread::spawn(move || sync_to_async(tx));
std::thread::spawn(move || async_to_sync(rx));
loop {
demo.step()?;
std::thread::sleep(Duration::from_millis(100));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment