Skip to content

Instantly share code, notes, and snippets.

@nanomad
Last active January 24, 2018 21:44
Show Gist options
  • Save nanomad/e6f4505af6b162008d473e190ec72d2b to your computer and use it in GitHub Desktop.
Save nanomad/e6f4505af6b162008d473e190ec72d2b to your computer and use it in GitHub Desktop.
[package]
name = "tokio-help"
version = "0.1.0"
authors = ["Giovanni Condello"]
[dependencies]
bytes = "0.4"
futures = "0.1"
tokio-io = "0.1"
tokio-core = "0.1"
tokio-proto = "0.1"
tokio-service = "0.1"
use std::io;
use std::str;
use bytes::BytesMut;
use tokio_io::codec::{Encoder, Decoder};
pub struct LineCodec;
impl Decoder for LineCodec {
type Item = String;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<String>> {
if let Some(i) = buf.iter().position(|&b| b == b'\n') {
// remove the serialized frame from the buffer.
let line = buf.split_to(i);
// Also remove the '\n'
buf.split_to(1);
// Turn this data into a UTF string and return it in a Frame.
match str::from_utf8(&line) {
Ok(s) => Ok(Some(s.to_string())),
Err(_) => Err(io::Error::new(io::ErrorKind::Other,
"invalid UTF-8")),
}
} else {
Ok(None)
}
}
}
impl Encoder for LineCodec {
type Item = String;
type Error = io::Error;
fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
buf.extend(msg.as_bytes());
buf.extend(b"\n");
Ok(())
}
}
extern crate bytes;
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_proto;
extern crate tokio_service;
mod codec;
use tokio_io::codec::Framed;
use tokio_core::net::TcpStream;
use codec::LineCodec;
use std::collections::VecDeque;
use tokio_core::reactor::Core;
use std::net::ToSocketAddrs;
use futures::Future;
use tokio_io::AsyncRead;
use futures::Sink;
use std::io;
use futures::Stream;
fn main() {
let mut messages = VecDeque::new();
messages.push_back(String::from("A"));
messages.push_back(String::from("B"));
let mut client = Client::new();
let framed = client.connect();
let framed = client.send_many(messages.clone(), framed).unwrap();
let framed = client.send_many(messages.clone(), framed).unwrap();
// Here the sink must not be closed...
}
struct Client {
reactor: Core,
}
impl Client {
pub fn new() -> Self {
Client {
reactor: Core::new().expect("Unable to create new reactor"),
}
}
pub fn connect(&mut self) -> Framed<TcpStream, LineCodec> {
let addr = "127.0.0.1:12345".to_socket_addrs().unwrap().next().unwrap();
let handle = self.reactor.handle();
let tcp_future = TcpStream::connect(&addr, &handle).map(|tcp| tcp);
let network_stream = self.reactor.run(tcp_future).unwrap();
network_stream.framed(LineCodec)
}
pub fn send_many_broken(
&mut self,
messages: VecDeque<String>,
framed: Framed<TcpStream, LineCodec>,
) -> Result<Framed<TcpStream, LineCodec>, io::Error> {
let s = futures::stream::iter_ok::<_, io::Error>(messages);
let fut = s.forward(framed);
let (_, framed) = self.reactor.run(fut)?;
Ok(framed)
}
pub fn send_many(
&mut self,
mut messages: VecDeque<String>,
framed: Framed<TcpStream, LineCodec>,
) -> Result<Framed<TcpStream, LineCodec>, io::Error> {
if let Some(pkt) = messages.pop_front() {
let framed2 = self.send_one(pkt, framed)?;
self.send_many(messages, framed2)
} else {
Ok(framed)
}
}
fn send_one(
&mut self,
message: String,
framed: Framed<TcpStream, LineCodec>,
) -> Result<Framed<TcpStream, LineCodec>, io::Error> {
let send = framed.send(message);
let outcome = self.reactor.run(send)?;
Ok(outcome)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment