Last active
August 17, 2023 23:20
-
-
Save eduardvercaemer/3acb8290acf3fc4f0971e33ae2c54176 to your computer and use it in GitHub Desktop.
capnproto rpc in rust over websockets (server + wasm client)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fn main() { | |
::capnpc::CompilerCommand::new() | |
.src_prefix("src") | |
.output_path("src") | |
.file("src/hello.capnp") | |
.run() | |
.unwrap(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::Result; | |
use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty::VatNetwork, RpcSystem}; | |
use futures::{io::AsyncReadExt, select, FutureExt, TryFutureExt}; | |
use proto::hello_capnp::hello; | |
use tracing::info; | |
use ws_stream_wasm::WsMeta; | |
async fn run() -> Result<()> { | |
let addr = "ws://127.0.0.1:9000"; | |
info!("connecting to {}...", addr); | |
let (ws, stream) = WsMeta::connect(addr, None).await?; | |
let (reader, writer) = stream.into_io().split(); | |
info!("setting up rpc..."); | |
let network = VatNetwork::new(reader, writer, Side::Client, Default::default()); | |
let mut system = RpcSystem::new(Box::new(network), None); | |
let client = system.bootstrap::<hello::Client>(Side::Server); | |
info!("sending rpc!"); | |
let mut system = system.fuse(); | |
let request = client.say_hello_request().send().promise; | |
let mut request = request.fuse(); | |
// we need to run both the rpc system + request, the system | |
// wont resolve even once the request compeltes, thus the | |
// weird select! + unreachable | |
select! { | |
_ = system => unreachable!(), | |
r = request => r.unwrap(), | |
}; | |
ws.close().await?; | |
info!("closed connection"); | |
Ok(()) | |
} | |
fn main() { | |
console_error_panic_hook::set_once(); | |
tracing_wasm::set_as_global_default(); | |
wasm_bindgen_futures::spawn_local(run().unwrap_or_else(|err| panic!("{}", err))); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@0x9d703016f6335958; | |
interface Hello { | |
sayHello @0 (); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// this is the common crate with the proto module | |
#[path = "hello_capnp.rs"] | |
#[allow(dead_code)] | |
pub mod hello_capnp; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::net::SocketAddr; | |
use anyhow::Result; | |
use async_tungstenite::tokio::accept_async; | |
use capnp::capability::Promise; | |
use capnp_rpc::{new_client, rpc_twoparty_capnp::Side, twoparty::VatNetwork, RpcSystem}; | |
use futures::io::AsyncReadExt; | |
use proto::hello_capnp::hello; | |
use tokio::net::TcpListener; | |
use tracing::info; | |
use tracing_subscriber::EnvFilter; | |
use ws_stream_tungstenite::WsStream; | |
#[derive(Debug)] | |
struct HelloImpl; | |
impl hello::Server for HelloImpl { | |
#[tracing::instrument] | |
fn say_hello( | |
&mut self, | |
_: hello::SayHelloParams, | |
_: hello::SayHelloResults, | |
) -> Promise<(), capnp::Error> { | |
info!("hello from rpc!"); | |
Promise::ok(()) | |
} | |
} | |
#[tokio::main(flavor = "current_thread")] | |
async fn main() -> Result<()> { | |
tracing_subscriber::fmt() | |
.pretty() | |
.with_env_filter(EnvFilter::from("info")) | |
.init(); | |
let addr = SocketAddr::from(([127, 0, 0, 1], 9000)); | |
info!("server listening on {}", addr); | |
let listener = TcpListener::bind(addr).await?; | |
loop { | |
let (stream, remote_addr) = listener.accept().await?; | |
info!("new connection from {}", remote_addr); | |
let socket = accept_async(stream).await?; | |
let ws = WsStream::new(socket); | |
let (reader, writer) = ws.split(); | |
let network = VatNetwork::new(reader, writer, Side::Server, Default::default()); | |
let system = RpcSystem::new( | |
Box::new(network), | |
Some(new_client::<hello::Client, _>(HelloImpl).client), | |
); | |
system.await?; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment