Skip to content

Instantly share code, notes, and snippets.

Last active August 28, 2019 04:48
Show Gist options
  • Save kiratp/dfcbcf0aa713a277d5d53b06d9db9308 to your computer and use it in GitHub Desktop.
Save kiratp/dfcbcf0aa713a277d5d53b06d9db9308 to your computer and use it in GitHub Desktop.
(Mostly working) Sample to use TowerGRPC against a TLS endpoint.
// [dependencies]
// futures = "0.1.27"
// http = "0.1.17"
// tokio = "0.1.21"
// tower-request-modifier = { git = "" }
// tower-grpc = { version = "0.1.0", features = ["tower-hyper"] }
// tower-service = "0.2"
// tower-util = "0.1"
// tokio-rustls = "0.10.0-alpha.3"
// webpki = "0.19.1"
// webpki-roots = "0.16.0"
// tower-h2 = { git = "" }
// openssl = "*"
// openssl-probe = "*"
use std::thread;
use std::sync::{Arc};
use futures::{future, Future};
use tower_util::MakeService;
use tokio_rustls::client::TlsStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
use std::net::SocketAddr;
use tokio::executor::DefaultExecutor;
use tokio::net::tcp::TcpStream;
use tower_h2;
use std::net::ToSocketAddrs;
struct Dst(SocketAddr);
impl tower_service::Service<()> for Dst {
type Response = TlsStream<TcpStream>;
type Error = ::std::io::Error;
type Future = Box<dyn Future<Item = TlsStream<TcpStream>, Error = ::std::io::Error> + Send>;
fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
fn call(&mut self, _: ()) -> Self::Future {
println!("{:?}", self.0);
let mut config = ClientConfig::new();
let config = Arc::new(config);
let tls_connector = TlsConnector::from(config);
let addr_string_local = "";
let domain = webpki::DNSNameRef::try_from_ascii_str(addr_string_local).unwrap();
let domain_local = domain.to_owned();
let stream = TcpStream::connect(&self.0).and_then(move |sock| {
tls_connector.connect(domain_local.as_ref(), sock)
.map(move |tcp| tcp);
// Same implementation but without TLS. Should make it straightforward to run without TLS
// when testing on local machine
// impl tower_service::Service<()> for Dst {
// type Response = TcpStream;
// type Error = ::std::io::Error;
// type Future = Box<dyn Future<Item = TcpStream, Error = ::std::io::Error> + Send>;
// fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
// Ok(().into())
// }
// fn call(&mut self, _: ()) -> Self::Future {
// let mut config = ClientConfig::new();
// config.alpn_protocols.push(b"h2".to_vec());
// config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
// let addr_string_local = "".to_string();
// let addr = addr_string_local.as_str();
// let stream = TcpStream::connect(&self.0)
// .and_then(move |sock| {
// sock.set_nodelay(true).unwrap();
// Ok(sock)
// });
// Box::new(stream)
// }
// }
fn connect() {
let keepalive = future::loop_fn((), move |_| {
let uri: http::Uri = "".parse().unwrap();
println!("Connecting to network at: {:?}", uri);
let addr = ""
let h2_settings = Default::default();
let mut make_client = tower_h2::client::Connect::new(Dst {0: addr}, h2_settings, DefaultExecutor::current());
.map_err(|e| {
eprintln!("HTTP/2 connection failed; err={:?}", e);
.and_then(move |conn| {
let conn = tower_request_modifier::Builder::new()
// Wait until the client is ready...
.map_err(|e| eprintln!("client closed: {:?}", e))
.and_then(move |mut client| {
// do stuff
.then(|e| {
eprintln!("Reopening client connection to network: {:?}", e);
let retry_sleep = std::time::Duration::from_secs(1);
thread::spawn(move || tokio::run(keepalive));
pub fn main() {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment