Skip to content

Instantly share code, notes, and snippets.

@eduardvercaemer
Last active August 17, 2023 23:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eduardvercaemer/3acb8290acf3fc4f0971e33ae2c54176 to your computer and use it in GitHub Desktop.
Save eduardvercaemer/3acb8290acf3fc4f0971e33ae2c54176 to your computer and use it in GitHub Desktop.
capnproto rpc in rust over websockets (server + wasm client)
fn main() {
::capnpc::CompilerCommand::new()
.src_prefix("src")
.output_path("src")
.file("src/hello.capnp")
.run()
.unwrap();
}
use anyhow::Result;
use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty::VatNetwork, RpcSystem};
use futures::{io::AsyncReadExt, select, FutureExt, TryFutureExt};
use proto::hello_capnp::hello;
use tracing::info;
use ws_stream_wasm::WsMeta;
async fn run() -> Result<()> {
let addr = "ws://127.0.0.1:9000";
info!("connecting to {}...", addr);
let (ws, stream) = WsMeta::connect(addr, None).await?;
let (reader, writer) = stream.into_io().split();
info!("setting up rpc...");
let network = VatNetwork::new(reader, writer, Side::Client, Default::default());
let mut system = RpcSystem::new(Box::new(network), None);
let client = system.bootstrap::<hello::Client>(Side::Server);
info!("sending rpc!");
let mut system = system.fuse();
let request = client.say_hello_request().send().promise;
let mut request = request.fuse();
// we need to run both the rpc system + request, the system
// wont resolve even once the request compeltes, thus the
// weird select! + unreachable
select! {
_ = system => unreachable!(),
r = request => r.unwrap(),
};
ws.close().await?;
info!("closed connection");
Ok(())
}
fn main() {
console_error_panic_hook::set_once();
tracing_wasm::set_as_global_default();
wasm_bindgen_futures::spawn_local(run().unwrap_or_else(|err| panic!("{}", err)));
}
@0x9d703016f6335958;
interface Hello {
sayHello @0 ();
}
/// this is the common crate with the proto module
#[path = "hello_capnp.rs"]
#[allow(dead_code)]
pub mod hello_capnp;
use std::net::SocketAddr;
use anyhow::Result;
use async_tungstenite::tokio::accept_async;
use capnp::capability::Promise;
use capnp_rpc::{new_client, rpc_twoparty_capnp::Side, twoparty::VatNetwork, RpcSystem};
use futures::io::AsyncReadExt;
use proto::hello_capnp::hello;
use tokio::net::TcpListener;
use tracing::info;
use tracing_subscriber::EnvFilter;
use ws_stream_tungstenite::WsStream;
#[derive(Debug)]
struct HelloImpl;
impl hello::Server for HelloImpl {
#[tracing::instrument]
fn say_hello(
&mut self,
_: hello::SayHelloParams,
_: hello::SayHelloResults,
) -> Promise<(), capnp::Error> {
info!("hello from rpc!");
Promise::ok(())
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.pretty()
.with_env_filter(EnvFilter::from("info"))
.init();
let addr = SocketAddr::from(([127, 0, 0, 1], 9000));
info!("server listening on {}", addr);
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, remote_addr) = listener.accept().await?;
info!("new connection from {}", remote_addr);
let socket = accept_async(stream).await?;
let ws = WsStream::new(socket);
let (reader, writer) = ws.split();
let network = VatNetwork::new(reader, writer, Side::Server, Default::default());
let system = RpcSystem::new(
Box::new(network),
Some(new_client::<hello::Client, _>(HelloImpl).client),
);
system.await?;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment