Skip to content

Instantly share code, notes, and snippets.

@AljoschaMeyer
Created March 4, 2018 12:47
Show Gist options
  • Save AljoschaMeyer/1bbca333a5482790bdcf8f803f6707f7 to your computer and use it in GitHub Desktop.
Save AljoschaMeyer/1bbca333a5482790bdcf8f803f6707f7 to your computer and use it in GitHub Desktop.
Example of using muxrpc-rs
#![feature(try_from)]
#![feature(ip_constructors)]
extern crate futures;
extern crate tokio;
extern crate tokio_io;
extern crate sodiumoxide;
extern crate secret_stream;
extern crate ssb_common;
extern crate ssb_keyfile;
extern crate muxrpc;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
use std::net::{SocketAddr, Ipv6Addr};
use std::convert::TryInto;
use tokio::net::TcpStream;
use secret_stream::Client;
use sodiumoxide::crypto::box_;
use futures::prelude::*;
use futures::future::ok;
use tokio_io::AsyncRead;
use muxrpc::*;
use ssb_common::*;
use ssb_common::keys::PublicKeyEncBuf;
use ssb_keyfile::load_or_create_keys;
use serde_json::Value;
#[derive(Serialize)]
struct CreateHistoryStream([HistoryStreamArg; 1]);
impl Rpc for CreateHistoryStream {
fn names() -> Box<[&'static str]> {
Box::new(["createHistoryStream"])
}
}
#[derive(Serialize)]
struct HistoryStreamArg {
id: String,
}
fn main() {
// Should always be called before using any of the crypto stuff: It makes
// things threadsafe and faster.
sodiumoxide::init();
// Read keys from the secret file from the ssb directory. Takes the
// `ssb_appname` environment variable into account.
let (pk, sk) = load_or_create_keys().unwrap();
let pk_string: String = PublicKeyEncBuf::new(&pk).into();
let pk = pk.try_into().unwrap();
let sk = sk.try_into().unwrap();
let (ephemeral_pk, ephemeral_sk) = box_::gen_keypair();
let addr = SocketAddr::new(Ipv6Addr::localhost().into(), DEFAULT_TCP_PORT);
TcpStream::connect(&addr)
.and_then(|tcp| {
// Performs a secret-handshake and yields an encrypted duplex connection.
Client::new(tcp,
&MAINNET_IDENTIFIER,
&pk,
&sk,
&ephemeral_pk,
&ephemeral_sk,
&pk)
.map_err(|(err, _)| err)
}).map_err(|_| unimplemented!())
.and_then(|connection| {
// Split the connection into its halves and create a muxrpc connection.
let (read, write) = connection.unwrap().split();
let (rpc_in, mut rpc_out, _) = muxrpc(read, write);
let req = CreateHistoryStream([HistoryStreamArg { id: "@".to_string() + &pk_string }]);
let (send_request, responses) = rpc_out.source::<_, Value, Value>(&req);
let use_responses = responses.for_each(|res| {
println!("{:?}", res);
ok(())
}).map_err(|err| {
println!("Got error receiving: {:?}", err);
unimplemented!()
});
// send the `createHistoryStream` rpc
send_request
// while dealing with the response in parallel
.join(use_responses)
// close this end of the packet-stream afterwards
.and_then(|_| rpc_out.close())
// meanwhile consume the incoming packets and stop execution
// if the server closes the connection
.select(rpc_in.for_each(|_| ok(())).map_err(|_| unimplemented!()))
// keep the typechecker happy
.map(|(item, _)| item)
.map_err(|(err, _)| err)
})
// Actually run the computations.
.wait()
.unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment