Last active
January 24, 2018 21:44
-
-
Save nanomad/e6f4505af6b162008d473e190ec72d2b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "tokio-help" | |
version = "0.1.0" | |
authors = ["Giovanni Condello"] | |
[dependencies] | |
bytes = "0.4" | |
futures = "0.1" | |
tokio-io = "0.1" | |
tokio-core = "0.1" | |
tokio-proto = "0.1" | |
tokio-service = "0.1" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::io; | |
use std::str; | |
use bytes::BytesMut; | |
use tokio_io::codec::{Encoder, Decoder}; | |
pub struct LineCodec; | |
impl Decoder for LineCodec { | |
type Item = String; | |
type Error = io::Error; | |
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<String>> { | |
if let Some(i) = buf.iter().position(|&b| b == b'\n') { | |
// remove the serialized frame from the buffer. | |
let line = buf.split_to(i); | |
// Also remove the '\n' | |
buf.split_to(1); | |
// Turn this data into a UTF string and return it in a Frame. | |
match str::from_utf8(&line) { | |
Ok(s) => Ok(Some(s.to_string())), | |
Err(_) => Err(io::Error::new(io::ErrorKind::Other, | |
"invalid UTF-8")), | |
} | |
} else { | |
Ok(None) | |
} | |
} | |
} | |
impl Encoder for LineCodec { | |
type Item = String; | |
type Error = io::Error; | |
fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> { | |
buf.extend(msg.as_bytes()); | |
buf.extend(b"\n"); | |
Ok(()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate bytes; | |
extern crate futures; | |
extern crate tokio_core; | |
extern crate tokio_io; | |
extern crate tokio_proto; | |
extern crate tokio_service; | |
mod codec; | |
use tokio_io::codec::Framed; | |
use tokio_core::net::TcpStream; | |
use codec::LineCodec; | |
use std::collections::VecDeque; | |
use tokio_core::reactor::Core; | |
use std::net::ToSocketAddrs; | |
use futures::Future; | |
use tokio_io::AsyncRead; | |
use futures::Sink; | |
use std::io; | |
use futures::Stream; | |
fn main() { | |
let mut messages = VecDeque::new(); | |
messages.push_back(String::from("A")); | |
messages.push_back(String::from("B")); | |
let mut client = Client::new(); | |
let framed = client.connect(); | |
let framed = client.send_many(messages.clone(), framed).unwrap(); | |
let framed = client.send_many(messages.clone(), framed).unwrap(); | |
// Here the sink must not be closed... | |
} | |
struct Client { | |
reactor: Core, | |
} | |
impl Client { | |
pub fn new() -> Self { | |
Client { | |
reactor: Core::new().expect("Unable to create new reactor"), | |
} | |
} | |
pub fn connect(&mut self) -> Framed<TcpStream, LineCodec> { | |
let addr = "127.0.0.1:12345".to_socket_addrs().unwrap().next().unwrap(); | |
let handle = self.reactor.handle(); | |
let tcp_future = TcpStream::connect(&addr, &handle).map(|tcp| tcp); | |
let network_stream = self.reactor.run(tcp_future).unwrap(); | |
network_stream.framed(LineCodec) | |
} | |
pub fn send_many_broken( | |
&mut self, | |
messages: VecDeque<String>, | |
framed: Framed<TcpStream, LineCodec>, | |
) -> Result<Framed<TcpStream, LineCodec>, io::Error> { | |
let s = futures::stream::iter_ok::<_, io::Error>(messages); | |
let fut = s.forward(framed); | |
let (_, framed) = self.reactor.run(fut)?; | |
Ok(framed) | |
} | |
pub fn send_many( | |
&mut self, | |
mut messages: VecDeque<String>, | |
framed: Framed<TcpStream, LineCodec>, | |
) -> Result<Framed<TcpStream, LineCodec>, io::Error> { | |
if let Some(pkt) = messages.pop_front() { | |
let framed2 = self.send_one(pkt, framed)?; | |
self.send_many(messages, framed2) | |
} else { | |
Ok(framed) | |
} | |
} | |
fn send_one( | |
&mut self, | |
message: String, | |
framed: Framed<TcpStream, LineCodec>, | |
) -> Result<Framed<TcpStream, LineCodec>, io::Error> { | |
let send = framed.send(message); | |
let outcome = self.reactor.run(send)?; | |
Ok(outcome) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment