-
-
Save JuanPotato/32587ac5fc77545ae7c6916fffc0f982 to your computer and use it in GitHub Desktop.
tokio servers project
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
authors = ["Hasan Ibraheem <hasantiny@gmail.com>"] | |
name = "rust_sa" | |
version = "0.1.0" | |
[dependencies] | |
bytes = "0.4.6" | |
failure = "0.1.1" | |
failure_derive = "0.1.1" | |
futures = "0.1.17" | |
lazy_static = "1.0.0" | |
native-tls = "0.1.5" | |
rdkafka = "0.14.1" | |
serde = "1.0.27" | |
serde_derive = "1.0.27" | |
tokio-core = "0.1.12" | |
tokio-io = "0.1.4" | |
tokio-proto = "0.1.1" | |
tokio-service = "0.1.0" | |
tokio-tls = "0.1.4" | |
[dependencies.config] | |
default-features = false | |
features = ["yaml"] | |
version = "0.7.0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
listeners: | |
- type: tcp_non_ssl | |
ip: 0.0.0.0 | |
port: 8444 | |
- type: tcp_ssl | |
ip: 0.0.0.0 | |
port: 8443 | |
- type: udp | |
ip: 0.0.0.0 | |
port: 8442 | |
ssl: | |
pkcs_file: server.pfx | |
password: hunter2 | |
apache_kafka: | |
server: localhost:9092 | |
topic: kafka_topic | |
group: kafka_group |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use tokio_io::codec::{Encoder, Decoder}; | |
use std::net::SocketAddr; | |
use tokio_core::net::UdpCodec; | |
use bytes::{BytesMut, BufMut}; | |
use std::{io, str}; | |
use std::io::Write; | |
pub struct LineCodec; | |
impl Decoder for LineCodec { | |
type Item = String; | |
type Error = io::Error; | |
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { | |
// Check to see if the frame contains a new line | |
if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') { | |
// remove the serialized frame from the buffer. | |
let line = buf.split_to(n); | |
// Also remove the '\n' | |
buf.split_to(1); | |
// Turn this data into a UTF string and return it in a Frame. | |
return match str::from_utf8(&line.as_ref()) { | |
Ok(s) => Ok(Some(s.to_string())), | |
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid string")), | |
} | |
} | |
Ok(None) | |
} | |
} | |
impl Encoder for LineCodec { | |
type Item = String; | |
type Error = io::Error; | |
fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> { | |
// Reserve enough space for the line | |
buf.reserve(msg.len() + 1); | |
buf.extend(msg.as_bytes()); | |
buf.put_u8(b'\n'); | |
Ok(()) | |
} | |
} | |
pub struct ULineCodec; | |
impl UdpCodec for ULineCodec { | |
type In = (SocketAddr, String); | |
type Out = (SocketAddr, String); | |
fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error> { | |
Ok((*src, str::from_utf8(&buf[..buf.len() - 1]).unwrap().to_owned())) | |
} | |
fn encode(&mut self, msg: Self::Out, into: &mut Vec<u8>) -> SocketAddr { | |
let _ = into.write_all(msg.1.as_bytes()).unwrap(); | |
println!("Wrote bytes"); | |
msg.0 | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod line; | |
extern crate config; | |
#[macro_use] | |
extern crate lazy_static; | |
#[macro_use] | |
extern crate serde_derive; | |
extern crate serde; | |
extern crate failure; | |
extern crate bytes; | |
extern crate futures; | |
extern crate tokio_proto; | |
extern crate tokio_core; | |
extern crate tokio_tls; | |
extern crate tokio_io; | |
extern crate native_tls; | |
extern crate rdkafka; | |
use futures::{Future, Stream}; | |
use tokio_core::net::{TcpListener, UdpSocket}; | |
use tokio_core::reactor::{Core, Handle}; | |
use tokio_io::AsyncRead; | |
use std::net::SocketAddr; | |
use native_tls::{Pkcs12, TlsAcceptor}; | |
use tokio_tls::TlsAcceptorExt; | |
use line::{LineCodec, ULineCodec}; | |
use std::fs::File; | |
use std::io::prelude::*; | |
use failure::Error; | |
use rdkafka::config::ClientConfig; | |
use rdkafka::producer::FutureProducer; | |
#[derive(Debug, Deserialize)] | |
struct ServerConfig { | |
#[serde(rename = "type")] | |
ttype: String, | |
ip: String, | |
port: u16, | |
} | |
#[derive(Debug, Deserialize)] | |
struct SslConfig { | |
pkcs_file: String, | |
password: String, | |
} | |
#[derive(Debug, Deserialize)] | |
struct KafkaConfig { | |
server: String, | |
topic: String, | |
group: String, | |
} | |
#[derive(Debug, Deserialize)] | |
struct ServersConfig { | |
listeners: Vec<ServerConfig>, | |
ssl: SslConfig, | |
apache_kafka: KafkaConfig, | |
} | |
lazy_static! { | |
static ref CONFIG: ServersConfig = { | |
let mut conf = config::Config::default(); | |
if let Err(e) = conf.merge(config::File::new("config", config::FileFormat::Yaml)) { | |
eprintln!("Error: {}", e); | |
std::process::exit(1); | |
} | |
match conf.deserialize() { | |
Ok(s) => s, | |
Err(e) => { | |
eprintln!("Error: {}", e); | |
std::process::exit(1); | |
}, | |
} | |
}; | |
} | |
type Producer = FutureProducer<rdkafka::client::EmptyContext>; | |
fn main() { | |
if let Err(ref e) = serve() { | |
eprintln!("error: {}", e); | |
std::process::exit(1); | |
} | |
} | |
fn serve<'a>() -> Result<(), Error> { | |
let mut lp = Core::new()?; | |
let mut loops = Vec::new(); | |
let producer = ClientConfig::new() | |
.set("bootstrap.servers", &CONFIG.apache_kafka.server) | |
.create::<FutureProducer<_>>()?; | |
for server in &CONFIG.listeners { | |
let addr = SocketAddr::new(server.ip.parse()?, server.port); | |
let server = match server.ttype.as_str() { | |
"tcp_non_ssl" => { | |
serve_tcp(lp.handle(), addr, producer.clone())? | |
}, | |
"tcp_ssl" => { | |
let pfx = get_pfx(&CONFIG.ssl.pkcs_file, &CONFIG.ssl.password)?; | |
serve_tls(lp.handle(), addr, pfx, producer.clone())? | |
}, | |
"udp" => { | |
serve_udp(lp.handle(), addr, producer.clone())? | |
}, | |
_ => { | |
eprintln!("`{}` is not a valid listener type.", &server.ttype); | |
std::process::exit(1); | |
} | |
}; | |
loops.push(server); | |
} | |
lp.run(futures::future::join_all(loops))?; | |
Ok(()) | |
} | |
fn get_pfx(file: &str, pass: &str) -> Result<Pkcs12, Error> { | |
let mut file = File::open(file)?; | |
let mut contents = Vec::new(); | |
file.read_to_end(&mut contents)?; | |
Ok(Pkcs12::from_der(&contents, pass)?) | |
} | |
fn serve_tcp<'a>(handle: Handle, addr: SocketAddr, producer: Producer) -> Result<Box<Future<Item=(), Error=std::io::Error>>, Error> { | |
let listener = TcpListener::bind(&addr, &handle)?; | |
Ok(Box::new(listener.incoming().for_each(move |(stream, _socket_addr)| { | |
let transport = stream.framed(LineCodec); | |
let producer = producer.clone(); | |
let process_connection = transport.for_each(move |line| { | |
producer.send_copy::<String, ()>(&CONFIG.apache_kafka.topic, None, Some(&line), None, None, -1); | |
Ok(()) | |
}); | |
handle.spawn(process_connection.map_err(|_| ())); | |
Ok(()) | |
}))) | |
} | |
fn serve_tls(handle: Handle, addr: SocketAddr, pfx: Pkcs12, producer: Producer) -> Result<Box<Future<Item=(), Error=std::io::Error>>, Error> { | |
let listener = TcpListener::bind(&addr, &handle)?; | |
let acceptor = TlsAcceptor::builder(pfx)?.build()?; | |
Ok(Box::new(listener.incoming().for_each(move |(socket, _socket_addr)| { | |
let hhandle = handle.clone(); | |
let producer = producer.clone(); | |
acceptor.accept_async(socket).then(move |stream| { | |
let transport = stream.unwrap().framed(LineCodec); | |
let process_connection = transport.for_each(move |line| { | |
producer.send_copy::<String, ()>(&CONFIG.apache_kafka.topic, None, Some(&line), None, None, -1); | |
Ok(()) | |
}); | |
hhandle.spawn(process_connection.map_err(|_| ())); | |
Ok(()) | |
}) | |
}))) | |
} | |
fn serve_udp(handle: Handle, addr: SocketAddr, producer: Producer) -> Result<Box<Future<Item=(), Error=std::io::Error>>, Error> { | |
let socket = UdpSocket::bind(&addr, &handle)?; | |
Ok(Box::new(socket.framed(ULineCodec).for_each(move |(_socket, data)| { | |
producer.send_copy::<String, ()>(&CONFIG.apache_kafka.topic, None, Some(&data), None, None, -1); | |
Ok(()) | |
}))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment