Skip to content

Instantly share code, notes, and snippets.

@vi
Last active June 19, 2023 20:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vi/28117c2583ea74d35babfcd6abbef9e6 to your computer and use it in GitHub Desktop.
Save vi/28117c2583ea74d35babfcd6abbef9e6 to your computer and use it in GitHub Desktop.
Rust async executor abuse to avoid both blocking and threads
[package]
name = "hackyws"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.66"
async-executor = "1.5.0"
async-tungstenite = "0.18.0"
futures = "0.3.25"
futures-io = "0.3.25"
pin-project = "1.0.12"
/// Imagine this is a third-party app where our code is loaded as a plugin and we want to use async.
/// But we should not block (so `Runtime::block_on` is inaccessible) and cannot create any threads.
/// We prefer the code use only simple OS features (without epoll or other events) and instead just rely on
/// periodic polling by timer (which is itself inside the third-party app).
/// `fn main` of this demo simulates such "third-party app", everything else is the plugin code.
/// Running the code should connect to Websocket mirror, periodically send messages to it and also print each incoming message to stdout.
/// `time strace -f hackyws` shows that polling is indeed moderated by timer (not just busy loop) and that there are no threads getting created.
use std::{time::Duration};
use async_tungstenite::tungstenite::Message;
use futures::{SinkExt, StreamExt, pin_mut};
pub struct HackyTimerPollDemo {
exe: async_executor::LocalExecutor<'static>,
}
pub mod timer_polled_executor_tools {
use std::io::{Read,Write};
use std::task::Poll;
use std::pin::Pin;
use std;
use std::time::{Instant, Duration};
#[pin_project::pin_project]
pub struct MySocketWrapper(#[pin] std::net::TcpStream);
impl MySocketWrapper {
pub fn new(s: std::net::TcpStream) -> std::io::Result<Self> {
s.set_nonblocking(true)?;
Ok(MySocketWrapper(s))
}
}
impl futures_io::AsyncRead for MySocketWrapper {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> Poll<futures_io::Result<usize>> {
match self.project().0.read(buf) {
Ok(x) => Poll::Ready(Ok(x)),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Immediately register for retrying, relying that timer would drive entire process instead of any actual events.
cx.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e))
}
}
}
impl futures_io::AsyncWrite for MySocketWrapper {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<futures_io::Result<usize>> {
match self.project().0.write(buf) {
Ok(x) => Poll::Ready(Ok(x)),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Immediately register for retrying, relying that timer would drive entire process instead of any actual events.
cx.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e))
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<futures_io::Result<()>> {
match self.project().0.flush() {
Ok(x) => Poll::Ready(Ok(x)),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Immediately register for retrying, relying that timer would drive entire process instead of any actual events.
cx.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e))
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<futures_io::Result<()>> {
self.poll_flush(cx)
}
}
pub struct HackySleep(Instant);
impl std::future::Future for HackySleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let now = Instant::now();
if now > self.0 {
Poll::Ready(())
} else {
// Immediately register for retrying, relying that timer would drive entire process instead of any actual events.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
pub fn sleep(t: Duration) -> HackySleep {
HackySleep(Instant::now() + t)
}
}
impl HackyTimerPollDemo {
async fn start_my_task() -> anyhow::Result<()> {
let s = std::net::TcpStream::connect("192.236.209.31:80")?;
let s = timer_polled_executor_tools::MySocketWrapper::new(s)?;
let (c, _) = async_tungstenite::client_async("ws://ws.vi-server.org/mirror", s).await?;
let (mut c_tx, c_rx) = c.split();
let subtask1 = async move {
let mut x = 0;
loop {
let _ = c_tx.send(Message::Text(format!("Hello, {}", x))).await;
x += 1;
timer_polled_executor_tools::sleep(Duration::from_secs(1)).await;
}
};
let subtask2 = c_rx.for_each(|x| {
match x {
Ok(x) => match x {
Message::Text(s) => println!("{}", s),
_ => (),
}
Err(_) => (),
}
futures::future::ready(())
});
pin_mut!(subtask1);
pin_mut!(subtask2);
futures::future::select(subtask1, subtask2).await;
Ok(())
}
pub fn new() -> anyhow::Result<HackyTimerPollDemo> {
// maybe `futures_executor::LocalPool` is better for this?
let exe = async_executor::LocalExecutor::new();
exe.spawn(HackyTimerPollDemo::start_my_task()).detach();
Ok(HackyTimerPollDemo {
exe,
})
}
pub fn step(&mut self) -> anyhow::Result<()> {
self.exe.try_tick();
Ok(())
}
}
fn main() -> anyhow::Result<()> {
let mut demo = HackyTimerPollDemo::new()?;
loop {
demo.step()?;
std::thread::sleep(Duration::from_millis(50));
}
}
@vi
Copy link
Author

vi commented Dec 18, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment