Skip to content

Instantly share code, notes, and snippets.

@unpluggedcoder
Last active August 5, 2020 12:34
Show Gist options
  • Save unpluggedcoder/b1bd611c5009afe23a76d204d5982df7 to your computer and use it in GitHub Desktop.
Save unpluggedcoder/b1bd611c5009afe23a76d204d5982df7 to your computer and use it in GitHub Desktop.
Json-Rpc in Rust
[package]
name = "jrpc"
version = "0.1.0"
authors = ["unpluggedcoder <unpluggedcoder@outlook.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "0.4"
pretty_env_logger = "0.4"
lazy_static = "1.4"
tokio = { version = "0.2", features = ["full"] } # tokio 0.2 not compatibile with futures 0.1
futures = { version = "0.3", features = ["compat"] } # We need "compat" feature.
jsonrpc-http-server = "14.2.0"
jsonrpc-core = "14.2.0" # Use futures 0.1
jsonrpc-derive = "14.2.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
// #![feature(async_await, futures_api)]
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
use futures::future::{FutureExt, TryFutureExt};
use std::future::Future;
use std::collections::HashMap;
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use jsonrpc_core::{self, types, BoxFuture, MetaIoHandler, Metadata, Result};
use jsonrpc_derive::rpc;
use jsonrpc_http_server::{hyper, ServerBuilder};
use tokio::runtime;
use tokio::sync::mpsc::{self, Sender, UnboundedSender};
lazy_static! {
static ref REQ_ID: AtomicUsize = AtomicUsize::new(0);
}
fn async_wait<T: Send + 'static>(
fut: impl Future<Output = Result<T>> + Send + 'static,
) -> BoxFuture<T> {
let compat_fut = fut.boxed().compat();
Box::new(compat_fut)
}
#[derive(Default, Clone)]
pub struct Meta(pub Option<UnboundedSender<(usize, Sender<usize>)>>);
impl Metadata for Meta {}
#[rpc(server)]
pub trait Rpc {
type Metadata;
/// Performs asynchronous operation
#[rpc(meta, name = "beFancy")]
fn call(&self, meta: Self::Metadata) -> BoxFuture<usize>;
}
#[derive(Default, Clone)]
struct RpcImpl;
impl Rpc for RpcImpl {
type Metadata = Meta;
fn call(&self, meta: Self::Metadata) -> BoxFuture<usize> {
let id = REQ_ID.fetch_add(1, Ordering::SeqCst);
let (tx, rx) = mpsc::channel::<usize>(1);
if let Some(sender) = meta.0 {
sender.send((id, tx)).unwrap();
}
let resp_fut = async move {
match rx.recv().await {
Some(id) => Ok(id),
None => Err(types::Error::new(types::ErrorCode::InternalError)),
}
};
async_wait(resp_fut)
}
}
fn main() {
env::set_var("RUST_LOG", "info");
pretty_env_logger::init();
let mut rt = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.expect("Runtime build failed.");
let (broker_sender, mut broker_receiver) = mpsc::unbounded_channel();
rt.spawn(async {
let mut io = MetaIoHandler::default();
let rpc = RpcImpl;
io.extend_with(rpc.to_delegate());
let _server = ServerBuilder::new(io)
.meta_extractor(move |_: &hyper::Request<hyper::Body>| {
info!("Meta extractor called.");
Meta(Some(broker_sender.clone()))
})
.start_http(&"127.0.0.1:9527".parse().unwrap())
.expect("Unable to start RPC server");
_server.wait();
});
rt.block_on(async move {
let mut rpc_resps: HashMap<usize, Sender<usize>> = HashMap::new();
info!("Borker loop start...");
loop {
if let Some((id, mut sender)) = broker_receiver.recv().await {
info!("Broker received: id({}).", id);
// Sleep for awhile
thread::sleep(Duration::from_secs(1));
sender.send(id * id).await.unwrap();
info!("Broker sent: id({})", id);
rpc_resps.insert(id, sender);
} else {
info!("Broker channel broken.");
break;
}
}
info!("Broker loop finished.");
});
}
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
use std::collections::HashMap;
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use jsonrpc_core::futures::{Async, Future as RpcFuture, Poll};
use jsonrpc_core::{self, *};
use jsonrpc_derive::rpc;
use jsonrpc_http_server::{hyper, ServerBuilder};
use tokio::runtime;
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender, UnboundedSender};
lazy_static! {
static ref REQ_ID: AtomicUsize = AtomicUsize::new(0);
}
#[derive(Debug)]
struct AsyncResponse {
recv: Receiver<usize>,
}
impl RpcFuture for AsyncResponse {
type Item = Result<String>;
type Error = types::Error;
/// This WON'T WORK. See fix.rs
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.recv.try_recv() {
Err(TryRecvError::Empty) => {
// Only polled twice if it's not ready.
info!("AsyncResponse not ready.");
Ok(Async::NotReady)
}
Err(TryRecvError::Closed) => {
info!("AsyncResponse closed.");
Err(Error::new(ErrorCode::InternalError).into())
}
Ok(value) => {
info!("AsyncResponse ready, response value: {}", value);
Ok(Async::Ready(Ok(value.to_string())))
}
}
}
}
#[derive(Default, Clone)]
pub struct Meta(pub Option<UnboundedSender<(usize, Sender<usize>)>>);
impl Metadata for Meta {}
#[rpc(server)]
pub trait Rpc {
type Metadata;
/// Performs asynchronous operation
#[rpc(meta, name = "beFancy")]
fn call(&self, meta: Self::Metadata) -> BoxFuture<Result<String>>;
}
#[derive(Default, Clone)]
struct RpcImpl;
impl Rpc for RpcImpl {
type Metadata = Meta;
fn call(&self, meta: Self::Metadata) -> BoxFuture<Result<String>> {
let id = REQ_ID.fetch_add(1, Ordering::SeqCst);
let (tx, rx) = mpsc::channel::<usize>(1);
if let Some(sender) = meta.0 {
sender.send((id, tx)).unwrap();
}
Box::new(AsyncResponse { recv: rx })
}
}
fn main() {
env::set_var("RUST_LOG", "info");
pretty_env_logger::init();
let mut rt = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.expect("Runtime build failed.");
let (broker_sender, mut broker_receiver) = mpsc::unbounded_channel();
{
let sender = broker_sender.clone();
rt.spawn(async {
let mut io = MetaIoHandler::default();
let rpc = RpcImpl;
io.extend_with(rpc.to_delegate());
let _server = ServerBuilder::new(io)
.meta_extractor(move |_: &hyper::Request<hyper::Body>| {
info!("Meta extractor called.");
Meta(Some(sender.clone()))
})
.start_http(&"127.0.0.1:9527".parse().unwrap())
.expect("Unable to start RPC server");
_server.wait();
});
}
rt.block_on(async move {
let mut rpc_resps: HashMap<usize, Sender<usize>> = HashMap::new();
info!("Borker loop start...");
loop {
if let Some((id, mut sender)) = broker_receiver.recv().await {
info!("Broker received: id({}).", id);
// Sleep for awhile
thread::sleep(Duration::from_secs(2));
sender.send(id * id).await.unwrap();
info!("Broker sent: id({})", id);
rpc_resps.insert(id, sender);
} else {
info!("Broker channel broken.");
break;
}
}
info!("Broker loop finished.");
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment