Skip to content

Instantly share code, notes, and snippets.

Created November 10, 2017 10:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/6cced6364fa763f6545cd1f7a7b45b91 to your computer and use it in GitHub Desktop.
Save anonymous/6cced6364fa763f6545cd1f7a7b45b91 to your computer and use it in GitHub Desktop.
Rust code shared from the playground
extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;
extern crate uuid;
extern crate bytes;
extern crate tokio_io;
extern crate tokio_proto;
extern crate tokio_service;
extern crate tokio_core;
extern crate futures;
extern crate futures_cpupool;
#[macro_use]
extern crate lazy_static;
use std::net::{IpAddr, SocketAddr, AddrParseError};
use std::mem;
use std::{io, thread};
use std::thread::JoinHandle;
use std::marker::PhantomData;
use std::collections::HashMap;
use std::sync::Mutex;
use futures_cpupool::CpuPool;
use tokio_proto::{TcpServer, TcpClient};
use tokio_io::codec::{Encoder, Decoder, Framed};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_proto::pipeline::{ServerProto, ClientProto, ClientService};
use uuid::Uuid;
use bytes::{BytesMut, IntoBuf, Buf, BigEndian, LittleEndian};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio_service::Service;
use futures::{future, Future};
use tokio_core::reactor::Handle;
use tokio_core::net::TcpStream;
pub trait ObjetoDominio {
fn get_network_id(&self) -> Option<Uuid>;
}
lazy_static! {
pub static ref SERVER_MAP: Mutex<HashMap<Uuid, Vec<u8>>> = {
let m = HashMap::new();
Mutex::new(m)
};
}
#[derive(Debug, Clone)]
pub struct RedeConfig {
pub ip: String,
pub porta: String,
pub modo: Modo,
}
#[derive(Debug)]
pub struct Rede {
endereco: SocketAddr,
thread_pool: JoinHandle<()>,
}
/// Modo de funcionamento da rede
#[derive(Debug, Clone)]
pub enum Modo {
Receptor,
Transmissor,
Hibrido,
}
impl Rede {
pub fn spawn_server(config: RedeConfig, tamanho_pool: usize) -> Result<Rede, AddrParseError> {
let config_copy = config.clone();
let addr: SocketAddr = (config.ip + ":" + &config.porta).parse()?;
let addr_clone = addr.clone();
let thread = thread::spawn(move || {
let server = TcpServer::new(RedeProto, addr_clone);
server.serve(|| Ok(DomainMemory));
});
Ok(Rede {
endereco: addr,
thread_pool: thread,
})
}
pub fn stop(self) {
let _ = self.thread_pool;
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct ReqType(u8);
pub const GET: ReqType = ReqType(0);
pub const POST: ReqType = ReqType(1);
//TODO: Outros tipos de id
#[derive(Default)]
pub struct RedeCodec;
impl Decoder for RedeCodec {
type Item = (ReqType, Option<Uuid>, Option<Vec<u8>>);
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
println!("Message received: {:?}", src);
let msg_size_size = mem::size_of::<u64>();
if src.len() < msg_size_size {
println!("Too small");
return Ok(None);
}
let (raw_msg_size, tail) = src.split_at(msg_size_size);
//TODO: considerar outros endians
let msg_size = raw_msg_size.into_buf().get_u64::<LittleEndian>();
if tail.len() < msg_size as usize {
println!("still Too small: {}", msg_size);
return Ok(None);
}
if tail.len() > msg_size as usize {
println!("Too long message");
return Err(io::Error::new(io::ErrorKind::InvalidInput, "msg is too long"));
}
let req_size = mem::size_of::<ReqType>();
let (raw_req, tail) = tail.split_at(req_size);
let req = ReqType(raw_req.into_buf().get_u8());
let has_id_size = mem::size_of::<u8>();
let (raw_has_id, tail) = tail.split_at(has_id_size);
let has_id: u8 = raw_has_id.into_buf().get_u8();
let id_size = mem::size_of::<Uuid>();
let (raw_id, tail) = tail.split_at(has_id_size);
let id = match has_id > 0 {
true => Some(Uuid::from_bytes(raw_id).unwrap()),
false => None,
};
match req {
GET => Ok(Some((req, id, None))),
POST => {
println!("posted");
let item = Vec::from(tail);
Ok(Some((req, id, Some(item))))
}
_ => {
println!("OOps");
Err(io::Error::new(io::ErrorKind::InvalidData,
format!("Invalid Request type: {:?}", req)))
}
}
}
}
impl Encoder for RedeCodec {
type Item = (Uuid, Vec<u8>);
type Error = io::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let serialized_id = item.0.as_bytes();
let msg_size = item.1.len() + serialized_id.len() + mem::size_of::<u8>();
let raw_size: [u8; 8] = unsafe { mem::transmute(msg_size) };
let raw_has_id: [u8; 1] = unsafe { mem::transmute(1 as u8) };
dst.extend(raw_size.iter()); // msg_size
dst.extend(raw_has_id.iter()); // has_id
dst.extend(serialized_id); //id
dst.extend(item.1); //data
Ok(())
}
}
pub struct RedeProto;
impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for RedeProto {
type Response = (Uuid, Vec<u8>);
type Request = (ReqType, Option<Uuid>, Option<Vec<u8>>);
type Transport = Framed<T, RedeCodec>;
type BindTransport = Result<Self::Transport, io::Error>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
Ok(io.framed(RedeCodec))
}
}
#[derive(Debug)]
pub struct DomainMemory;
impl Service for DomainMemory {
type Response = (Uuid, Vec<u8>);
type Request = (ReqType, Option<Uuid>, Option<Vec<u8>>);
type Error = io::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
// Produce a future for computing a response from a request.
fn call(&self, req: Self::Request) -> Self::Future {
println!("Into service");
match (req.0, req.1) {
(GET, Some(id)) => {
let lock = SERVER_MAP.lock().unwrap();
let vec = lock.get(&id).clone();
match vec {
Some(inner) => Box::new(future::ok((id, inner.clone()))),
None => {
Box::new(future::err(io::Error::new(io::ErrorKind::InvalidData,
format!("Object with id {} not found",
id))))
}
}
}
(GET, None) => {
Box::new(future::err(io::Error::new(io::ErrorKind::InvalidData,
"GET request without object id")))
}
(POST, Some(id)) => {
let _ = SERVER_MAP
.lock()
.unwrap()
.insert(id.clone(), req.2.clone().unwrap());
Box::new(future::ok((id, req.2.unwrap())))
}
(POST, None) => {
let new_id = Uuid::new_v4();
let _ = SERVER_MAP
.lock()
.unwrap()
.insert(new_id, req.2.clone().unwrap());
Box::new(future::ok((new_id, req.2.unwrap())))
}
(invalid_request, _) => {
Box::new(future::err(io::Error::new(io::ErrorKind::InvalidData,
format!("Invalid request type: {:?}",
invalid_request))))
}
}
}
}
pub struct Client<T: Serialize + DeserializeOwned> {
_pd: PhantomData<T>
}
impl<T: Serialize + DeserializeOwned + 'static> Service for Client<T> {
type Request = Vec<u8>;
type Response = T;
type Error = io::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
fn call(&self, req: Self::Request) -> Self::Future {
let maped = serde_json::from_slice(req.as_ref())
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, "Error unwraping"));
Box::new(future::done(maped))
}
}
impl<T: AsyncRead + AsyncWrite + 'static> ClientProto<T> for RedeProto {
type Request = (Uuid, Vec<u8>);
type Response = (ReqType, Option<Uuid>, Option<Vec<u8>>);
type Transport = Framed<T, RedeCodec>;
type BindTransport = Result<Self::Transport, io::Error>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
Ok(io.framed(RedeCodec))
}
}
impl<T: Serialize + DeserializeOwned + ObjetoDominio + 'static> Client<T> {
pub fn connect(addr: &SocketAddr, handle: &Handle) -> Box<Future<Item = Client<T>, Error = io::Error>> {
let ret = TcpClient::new(RedeProto)
.connect(addr, handle)
.map(|_| {
Client {_pd: PhantomData}
});
Box::new(ret)
}
pub fn get(&self, id: Uuid) -> Box<Future<Item = T, Error = io::Error>> {
let raw_id = id.as_bytes();
let raw_request = GET.0.into_buf().clone();
let raw_request = raw_request.bytes();
let raw_has_id: [u8; 1] = unsafe {mem::transmute(1 as u8)};
let msg_size = raw_id.len() + raw_request.len() + raw_has_id.len();
let raw_size: [u8; 8] = unsafe {mem::transmute(msg_size)};
let mut ret: Vec<u8> = Vec::new();
ret.extend(raw_size.iter());
ret.extend(raw_request.iter());
ret.extend(raw_has_id.iter());
ret.extend(raw_id.iter());
let resp = self.call(ret);
resp
}
pub fn post(&self, id: Option<Uuid>, obj: &T) -> Box<Future<Item = Uuid, Error = io::Error>> {
match id {
Some(id) => {
let raw_id = id.as_bytes();
let raw_request = GET.0.into_buf().clone();
let raw_request = raw_request.bytes();
let raw_has_id: [u8; 1] = unsafe {mem::transmute(1 as u8)};
//TODO fix
let raw_ser = serde_json::to_vec(obj).unwrap();
let raw_ser: &[u8] = raw_ser.as_ref();
let msg_size = raw_id.len() + raw_request.len() + raw_has_id.len() + raw_ser.len();
let raw_size: [u8; 8] = unsafe {mem::transmute(msg_size)};
let mut ret: Vec<u8> = Vec::new();
ret.extend(raw_size.iter());
ret.extend(raw_request.iter());
ret.extend(raw_has_id.iter());
ret.extend(raw_id.iter());
ret.extend(raw_ser.iter());
let _ = self.call(ret);
Box::new(future::done(Ok(id)))
}
None => {
let raw_id = Uuid::nil();
let raw_id = raw_id.as_bytes();
let raw_request = GET.0.into_buf().clone();
let raw_request = raw_request.bytes();
let raw_has_id: [u8; 1] = unsafe {mem::transmute(0 as u8)};
//TODO fix
let raw_ser = serde_json::to_vec(obj).unwrap();
let raw_ser: &[u8] = raw_ser.as_ref();
let msg_size = raw_id.len() + raw_request.len() + raw_has_id.len() + raw_ser.len();
let raw_size: [u8; 8] = unsafe {mem::transmute(msg_size)};
let mut ret: Vec<u8> = Vec::new();
ret.extend(raw_size.iter());
ret.extend(raw_request.iter());
ret.extend(raw_has_id.iter());
ret.extend(raw_id.iter());
ret.extend(raw_ser.iter());
let resp = self.call(ret)
.map(|x| x.get_network_id().unwrap() );
Box::new(resp)
}
}
}
}
fn main() {
let config = RedeConfig {
ip: "0.0.0.0".to_owned(),
modo: Modo::Hibrido,
porta: "4000".to_owned(),
};
println!("Starting thread");
let rede = Rede::spawn_server(config, 1);
assert!(rede.is_ok(), format!("{:?}", rede));
let rede = rede.unwrap();
println!("Thread started");
loop {
use std::io::stdin;
let mut input = String::new();
stdin().read_line(&mut input).expect("Wrong input");
if let Some('\n') = input.chars().next_back() {
input.pop();
}
if let Some('\r') = input.chars().next_back() {
input.pop();
}
if input == "STOP" {
break;
}
}
println!("Stoping server");
rede.stop();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment