Created
March 4, 2018 12:47
-
-
Save AljoschaMeyer/1bbca333a5482790bdcf8f803f6707f7 to your computer and use it in GitHub Desktop.
Example of using muxrpc-rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(try_from)] | |
#![feature(ip_constructors)] | |
extern crate futures; | |
extern crate tokio; | |
extern crate tokio_io; | |
extern crate sodiumoxide; | |
extern crate secret_stream; | |
extern crate ssb_common; | |
extern crate ssb_keyfile; | |
extern crate muxrpc; | |
extern crate serde; | |
#[macro_use] | |
extern crate serde_derive; | |
extern crate serde_json; | |
use std::net::{SocketAddr, Ipv6Addr}; | |
use std::convert::TryInto; | |
use tokio::net::TcpStream; | |
use secret_stream::Client; | |
use sodiumoxide::crypto::box_; | |
use futures::prelude::*; | |
use futures::future::ok; | |
use tokio_io::AsyncRead; | |
use muxrpc::*; | |
use ssb_common::*; | |
use ssb_common::keys::PublicKeyEncBuf; | |
use ssb_keyfile::load_or_create_keys; | |
use serde_json::Value; | |
#[derive(Serialize)] | |
struct CreateHistoryStream([HistoryStreamArg; 1]); | |
impl Rpc for CreateHistoryStream { | |
fn names() -> Box<[&'static str]> { | |
Box::new(["createHistoryStream"]) | |
} | |
} | |
#[derive(Serialize)] | |
struct HistoryStreamArg { | |
id: String, | |
} | |
fn main() { | |
// Should always be called before using any of the crypto stuff: It makes | |
// things threadsafe and faster. | |
sodiumoxide::init(); | |
// Read keys from the secret file from the ssb directory. Takes the | |
// `ssb_appname` environment variable into account. | |
let (pk, sk) = load_or_create_keys().unwrap(); | |
let pk_string: String = PublicKeyEncBuf::new(&pk).into(); | |
let pk = pk.try_into().unwrap(); | |
let sk = sk.try_into().unwrap(); | |
let (ephemeral_pk, ephemeral_sk) = box_::gen_keypair(); | |
let addr = SocketAddr::new(Ipv6Addr::localhost().into(), DEFAULT_TCP_PORT); | |
TcpStream::connect(&addr) | |
.and_then(|tcp| { | |
// Performs a secret-handshake and yields an encrypted duplex connection. | |
Client::new(tcp, | |
&MAINNET_IDENTIFIER, | |
&pk, | |
&sk, | |
&ephemeral_pk, | |
&ephemeral_sk, | |
&pk) | |
.map_err(|(err, _)| err) | |
}).map_err(|_| unimplemented!()) | |
.and_then(|connection| { | |
// Split the connection into its halves and create a muxrpc connection. | |
let (read, write) = connection.unwrap().split(); | |
let (rpc_in, mut rpc_out, _) = muxrpc(read, write); | |
let req = CreateHistoryStream([HistoryStreamArg { id: "@".to_string() + &pk_string }]); | |
let (send_request, responses) = rpc_out.source::<_, Value, Value>(&req); | |
let use_responses = responses.for_each(|res| { | |
println!("{:?}", res); | |
ok(()) | |
}).map_err(|err| { | |
println!("Got error receiving: {:?}", err); | |
unimplemented!() | |
}); | |
// send the `createHistoryStream` rpc | |
send_request | |
// while dealing with the response in parallel | |
.join(use_responses) | |
// close this end of the packet-stream afterwards | |
.and_then(|_| rpc_out.close()) | |
// meanwhile consume the incoming packets and stop execution | |
// if the server closes the connection | |
.select(rpc_in.for_each(|_| ok(())).map_err(|_| unimplemented!())) | |
// keep the typechecker happy | |
.map(|(item, _)| item) | |
.map_err(|(err, _)| err) | |
}) | |
// Actually run the computations. | |
.wait() | |
.unwrap(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment