Skip to content

Instantly share code, notes, and snippets.

@jmaargh
Created September 12, 2021 19:44
Show Gist options
  • Save jmaargh/bb777062575020f61105bc02db78aa81 to your computer and use it in GitHub Desktop.
Save jmaargh/bb777062575020f61105bc02db78aa81 to your computer and use it in GitHub Desktop.
Cap'n Proto RPC Rust blocking
fn main() {
::capnpc::CompilerCommand::new()
.file("src/count.capnp")
.run()
.unwrap();
}
[package]
name = "cprpc_blocking"
version = "0.1.0"
edition = "2018"
[[bin]]
name = "server"
path = "src/server.rs"
[[bin]]
name = "client"
path = "src/client.rs"
[dependencies]
capnp = "0.14.3"
capnp-rpc = "0.14.1"
async-net = "1.6.1"
async-io = "1.6.0"
futures-util = "0.3.17"
[build-dependencies]
capnpc = "0.14.4"
use cprpc_blocking::Client;
fn main() {
let mut client = Client::connect();
println!("first: {}", client.count());
std::thread::sleep(std::time::Duration::from_secs(2));
println!("second: {}", client.count());
}
@0xe3f505472d6c6288;
interface Counter {
count @0 () -> (count :UInt64);
}
use async_io::block_on;
use capnp::capability::Promise;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures_util::{future::select, AsyncReadExt};
use count_capnp::counter;
mod count_capnp {
include!(concat!(env!("OUT_DIR"), "/src/count_capnp.rs"));
}
const SOCKET_PATH: &str = "/tmp/count.sock";
pub fn serve<H: RpcHandler>(server: Server<H>) -> ! {
let _ = std::fs::remove_file(SOCKET_PATH);
let listener = std::os::unix::net::UnixListener::bind(SOCKET_PATH).unwrap();
loop {
let (stream, _) = listener.accept().unwrap();
let server_clone = server.clone();
std::thread::spawn(move || {
let client: counter::Client = capnp_rpc::new_client(server_clone);
let stream: async_net::unix::UnixStream = async_io::Async::new(stream).unwrap().into();
let (reader, writer) = stream.split();
let network = Box::new(twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Server,
Default::default(),
));
let rpc_system = RpcSystem::new(network, Some(client.clone().client));
block_on(rpc_system).unwrap();
});
}
}
pub struct Client {
rpc_system: RpcSystem<rpc_twoparty_capnp::Side>,
client: counter::Client,
}
impl Client {
pub fn connect() -> Self {
let stream = std::os::unix::net::UnixStream::connect(SOCKET_PATH).unwrap();
let stream: async_net::unix::UnixStream = async_io::Async::new(stream).unwrap().into();
let (reader, writer) = stream.split();
let network = Box::new(twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(network, None);
let client: counter::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
Self { rpc_system, client }
}
pub fn count(&mut self) -> u64 {
let request = self.client.count_request();
let promise = request.send().promise;
let response = match block_on(select(&mut self.rpc_system, promise)) {
futures_util::future::Either::Left((_rpc_error, _)) => panic!("rpc system errored"),
futures_util::future::Either::Right((response, _)) => response.unwrap(),
};
let out = response.get().unwrap().get_count();
out
}
}
pub trait RpcHandler: Clone + Send + 'static {
fn count(&mut self) -> u64;
}
#[derive(Clone)]
pub struct Server<H: RpcHandler> {
pub handler: H,
}
impl<H: RpcHandler> Server<H> {
pub fn new(server: H) -> Self {
Self { handler: server }
}
}
impl<H: RpcHandler> counter::Server for Server<H> {
fn count(
&mut self,
_: counter::CountParams,
mut results: counter::CountResults,
) -> capnp::capability::Promise<(), capnp::Error> {
results.get().set_count(self.handler.count());
Promise::ok(())
}
}
use std::sync::{Arc, Mutex};
use cprpc_blocking::{serve, RpcHandler, Server};
fn main() {
let handler = Handler {
count: Arc::new(Mutex::new(0)),
};
let server = Server { handler };
serve(server);
}
#[derive(Clone)]
struct Handler {
count: Arc<Mutex<u64>>,
}
impl RpcHandler for Handler {
fn count(&mut self) -> u64 {
println!("count");
std::thread::sleep(std::time::Duration::from_secs(2));
let mut count = self.count.lock().unwrap();
*count += 1;
*count
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment