Skip to content

Instantly share code, notes, and snippets.

@AljoschaMeyer
Created March 5, 2018 21:46
Show Gist options
  • Save AljoschaMeyer/e2ca433b2857acd085feb4ee80d9fd23 to your computer and use it in GitHub Desktop.
Save AljoschaMeyer/e2ca433b2857acd085feb4ee80d9fd23 to your computer and use it in GitHub Desktop.
Example of using muxrpc over secret_stream with an event loop.
#![feature(try_from)]
#![feature(ip_constructors)]
extern crate futures;
extern crate tokio;
extern crate tokio_io;
extern crate sodiumoxide;
extern crate secret_stream;
extern crate ssb_common;
extern crate ssb_keyfile;
extern crate muxrpc;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
use std::net::{SocketAddr, Ipv6Addr};
use std::convert::TryInto;
use tokio::executor::current_thread;
use tokio::net::TcpStream;
use secret_stream::OwningClient;
use sodiumoxide::crypto::box_;
use futures::prelude::*;
use futures::future::ok;
use tokio_io::AsyncRead;
use muxrpc::*;
use ssb_common::*;
use ssb_common::keys::PublicKeyEncBuf;
use ssb_keyfile::load_or_create_keys;
use serde_json::Value;
#[derive(Serialize)]
struct CreateHistoryStream([HistoryStreamArg; 1]);
impl Rpc for CreateHistoryStream {
fn names() -> Box<[&'static str]> {
Box::new(["createHistoryStream"])
}
}
#[derive(Serialize)]
struct HistoryStreamArg {
id: String,
}
fn main() {
sodiumoxide::init();
let (pk, sk) = load_or_create_keys().unwrap();
let pk_string: String = PublicKeyEncBuf::new(&pk).into();
let pk = pk.try_into().unwrap();
let sk = sk.try_into().unwrap();
let (ephemeral_pk, ephemeral_sk) = box_::gen_keypair();
let addr = SocketAddr::new(Ipv6Addr::localhost().into(), DEFAULT_TCP_PORT);
let do_stuff = TcpStream::connect(&addr)
.and_then(move |tcp| {
// Performs a secret-handshake and yields an encrypted duplex connection.
OwningClient::new(tcp,
&MAINNET_IDENTIFIER,
&pk,
&sk,
&ephemeral_pk,
&ephemeral_sk,
&pk)
.map_err(|(err, _)| err)
})
.map_err(|_| unimplemented!())
.map(move |connection| {
// Split the connection into its halves and create a muxrpc connection.
let (read, write) = connection.unwrap().split();
let (_, mut rpc_out, _) = muxrpc(read, write);
let req = CreateHistoryStream([HistoryStreamArg { id: "@".to_string() + &pk_string }]);
let (send_request, responses) = rpc_out.source::<_, Value, Value>(&req);
current_thread::spawn(send_request.map_err(|_| unimplemented!()));
current_thread::spawn(responses
.for_each(|res| {
println!("{:?}", res);
ok(())
})
.map_err(|err| {
println!("Got error receiving: {:?}", err);
unimplemented!()
})
.and_then(|_| {
rpc_out.close().map_err(|_| unimplemented!())
}));
});
current_thread::run(|_| current_thread::spawn(do_stuff));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment