Skip to content

Instantly share code, notes, and snippets.

@JuanPotato
Created January 26, 2018 03:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JuanPotato/32587ac5fc77545ae7c6916fffc0f982 to your computer and use it in GitHub Desktop.
Save JuanPotato/32587ac5fc77545ae7c6916fffc0f982 to your computer and use it in GitHub Desktop.
tokio servers project
[package]
authors = ["Hasan Ibraheem <hasantiny@gmail.com>"]
name = "rust_sa"
version = "0.1.0"
[dependencies]
bytes = "0.4.6"
failure = "0.1.1"
failure_derive = "0.1.1"
futures = "0.1.17"
lazy_static = "1.0.0"
native-tls = "0.1.5"
rdkafka = "0.14.1"
serde = "1.0.27"
serde_derive = "1.0.27"
tokio-core = "0.1.12"
tokio-io = "0.1.4"
tokio-proto = "0.1.1"
tokio-service = "0.1.0"
tokio-tls = "0.1.4"
[dependencies.config]
default-features = false
features = ["yaml"]
version = "0.7.0"
---
listeners:
- type: tcp_non_ssl
ip: 0.0.0.0
port: 8444
- type: tcp_ssl
ip: 0.0.0.0
port: 8443
- type: udp
ip: 0.0.0.0
port: 8442
ssl:
pkcs_file: server.pfx
password: hunter2
apache_kafka:
server: localhost:9092
topic: kafka_topic
group: kafka_group
use tokio_io::codec::{Encoder, Decoder};
use std::net::SocketAddr;
use tokio_core::net::UdpCodec;
use bytes::{BytesMut, BufMut};
use std::{io, str};
use std::io::Write;
pub struct LineCodec;
impl Decoder for LineCodec {
type Item = String;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
// Check to see if the frame contains a new line
if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
// remove the serialized frame from the buffer.
let line = buf.split_to(n);
// Also remove the '\n'
buf.split_to(1);
// Turn this data into a UTF string and return it in a Frame.
return match str::from_utf8(&line.as_ref()) {
Ok(s) => Ok(Some(s.to_string())),
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
}
}
Ok(None)
}
}
impl Encoder for LineCodec {
type Item = String;
type Error = io::Error;
fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
// Reserve enough space for the line
buf.reserve(msg.len() + 1);
buf.extend(msg.as_bytes());
buf.put_u8(b'\n');
Ok(())
}
}
pub struct ULineCodec;
impl UdpCodec for ULineCodec {
type In = (SocketAddr, String);
type Out = (SocketAddr, String);
fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error> {
Ok((*src, str::from_utf8(&buf[..buf.len() - 1]).unwrap().to_owned()))
}
fn encode(&mut self, msg: Self::Out, into: &mut Vec<u8>) -> SocketAddr {
let _ = into.write_all(msg.1.as_bytes()).unwrap();
println!("Wrote bytes");
msg.0
}
}
mod line;
extern crate config;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate failure;
extern crate bytes;
extern crate futures;
extern crate tokio_proto;
extern crate tokio_core;
extern crate tokio_tls;
extern crate tokio_io;
extern crate native_tls;
extern crate rdkafka;
use futures::{Future, Stream};
use tokio_core::net::{TcpListener, UdpSocket};
use tokio_core::reactor::{Core, Handle};
use tokio_io::AsyncRead;
use std::net::SocketAddr;
use native_tls::{Pkcs12, TlsAcceptor};
use tokio_tls::TlsAcceptorExt;
use line::{LineCodec, ULineCodec};
use std::fs::File;
use std::io::prelude::*;
use failure::Error;
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureProducer;
#[derive(Debug, Deserialize)]
struct ServerConfig {
#[serde(rename = "type")]
ttype: String,
ip: String,
port: u16,
}
#[derive(Debug, Deserialize)]
struct SslConfig {
pkcs_file: String,
password: String,
}
#[derive(Debug, Deserialize)]
struct KafkaConfig {
server: String,
topic: String,
group: String,
}
#[derive(Debug, Deserialize)]
struct ServersConfig {
listeners: Vec<ServerConfig>,
ssl: SslConfig,
apache_kafka: KafkaConfig,
}
lazy_static! {
static ref CONFIG: ServersConfig = {
let mut conf = config::Config::default();
if let Err(e) = conf.merge(config::File::new("config", config::FileFormat::Yaml)) {
eprintln!("Error: {}", e);
std::process::exit(1);
}
match conf.deserialize() {
Ok(s) => s,
Err(e) => {
eprintln!("Error: {}", e);
std::process::exit(1);
},
}
};
}
type Producer = FutureProducer<rdkafka::client::EmptyContext>;
fn main() {
if let Err(ref e) = serve() {
eprintln!("error: {}", e);
std::process::exit(1);
}
}
fn serve<'a>() -> Result<(), Error> {
let mut lp = Core::new()?;
let mut loops = Vec::new();
let producer = ClientConfig::new()
.set("bootstrap.servers", &CONFIG.apache_kafka.server)
.create::<FutureProducer<_>>()?;
for server in &CONFIG.listeners {
let addr = SocketAddr::new(server.ip.parse()?, server.port);
let server = match server.ttype.as_str() {
"tcp_non_ssl" => {
serve_tcp(lp.handle(), addr, producer.clone())?
},
"tcp_ssl" => {
let pfx = get_pfx(&CONFIG.ssl.pkcs_file, &CONFIG.ssl.password)?;
serve_tls(lp.handle(), addr, pfx, producer.clone())?
},
"udp" => {
serve_udp(lp.handle(), addr, producer.clone())?
},
_ => {
eprintln!("`{}` is not a valid listener type.", &server.ttype);
std::process::exit(1);
}
};
loops.push(server);
}
lp.run(futures::future::join_all(loops))?;
Ok(())
}
fn get_pfx(file: &str, pass: &str) -> Result<Pkcs12, Error> {
let mut file = File::open(file)?;
let mut contents = Vec::new();
file.read_to_end(&mut contents)?;
Ok(Pkcs12::from_der(&contents, pass)?)
}
fn serve_tcp<'a>(handle: Handle, addr: SocketAddr, producer: Producer) -> Result<Box<Future<Item=(), Error=std::io::Error>>, Error> {
let listener = TcpListener::bind(&addr, &handle)?;
Ok(Box::new(listener.incoming().for_each(move |(stream, _socket_addr)| {
let transport = stream.framed(LineCodec);
let producer = producer.clone();
let process_connection = transport.for_each(move |line| {
producer.send_copy::<String, ()>(&CONFIG.apache_kafka.topic, None, Some(&line), None, None, -1);
Ok(())
});
handle.spawn(process_connection.map_err(|_| ()));
Ok(())
})))
}
fn serve_tls(handle: Handle, addr: SocketAddr, pfx: Pkcs12, producer: Producer) -> Result<Box<Future<Item=(), Error=std::io::Error>>, Error> {
let listener = TcpListener::bind(&addr, &handle)?;
let acceptor = TlsAcceptor::builder(pfx)?.build()?;
Ok(Box::new(listener.incoming().for_each(move |(socket, _socket_addr)| {
let hhandle = handle.clone();
let producer = producer.clone();
acceptor.accept_async(socket).then(move |stream| {
let transport = stream.unwrap().framed(LineCodec);
let process_connection = transport.for_each(move |line| {
producer.send_copy::<String, ()>(&CONFIG.apache_kafka.topic, None, Some(&line), None, None, -1);
Ok(())
});
hhandle.spawn(process_connection.map_err(|_| ()));
Ok(())
})
})))
}
fn serve_udp(handle: Handle, addr: SocketAddr, producer: Producer) -> Result<Box<Future<Item=(), Error=std::io::Error>>, Error> {
let socket = UdpSocket::bind(&addr, &handle)?;
Ok(Box::new(socket.framed(ULineCodec).for_each(move |(_socket, data)| {
producer.send_copy::<String, ()>(&CONFIG.apache_kafka.topic, None, Some(&data), None, None, -1);
Ok(())
})))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment