Skip to content

Instantly share code, notes, and snippets.

@algon-320
Created April 22, 2022 10:43
Show Gist options
  • Save algon-320/ed8efe501dd06d0151e499e0ed48640a to your computer and use it in GitHub Desktop.
Save algon-320/ed8efe501dd06d0151e499e0ed48640a to your computer and use it in GitHub Desktop.
TCP echo server implemented with actix
//# actix = "0.13.0"
//# tokio = { version = "1.17.0", features = ["net"] }
//# tokio-util = { version = "0.7.1", features = ["codec"] }
//# bytes = "1.1.0"
use actix::prelude::*;
use std::net::SocketAddr;
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener, TcpStream,
};
use actix::io::{SinkWrite, WriteHandler};
use bytes::{Bytes, BytesMut};
use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite};
type BytesWriter = SinkWrite<Bytes, FramedWrite<OwnedWriteHalf, BytesCodec>>;
struct Connection {
peer_addr: SocketAddr,
writer: BytesWriter,
}
impl Actor for Connection {
type Context = Context<Self>;
fn started(&mut self, _: &mut Self::Context) {
println!("{}: started", self.peer_addr);
}
fn stopped(&mut self, _: &mut Self::Context) {
println!("{}: stopped", self.peer_addr);
}
}
impl Connection {
fn new(stream: TcpStream) -> Addr<Connection> {
let peer_addr = stream.peer_addr().unwrap();
let (rhalf, whalf): (OwnedReadHalf, OwnedWriteHalf) = stream.into_split();
// Create a context in advance (without attaching any actor yet.)
let mut ctx = Context::new();
// Stream<Item = Result<BytesMut, std::io::Error>>
// is implemented for FramedRead.
let stream = FramedRead::new(rhalf, BytesCodec::default());
// By attaching the stream to the context here,
// we can receive the item on StreamHandler::handle
// when Item (BytesMut) is produced from the stream.
Connection::add_stream(stream, &mut ctx);
// impl Sink<Item = Bytes, Error = std::io::Error>
let sink = FramedWrite::new(whalf, BytesCodec::default());
// SinkWrite is a wrapper around the sink,
// which absorbs async handling in synchronous handlers.
// NOTE: SinkWrite requires `WriteHandler<Sink::Error>` impl for the actor.
let writer = SinkWrite::new(sink, &mut ctx);
// Create and run an actor.
let tcp = Connection { peer_addr, writer };
ctx.run(tcp)
}
}
impl StreamHandler<Result<BytesMut, std::io::Error>> for Connection {
fn handle(&mut self, res: Result<BytesMut, std::io::Error>, _: &mut Self::Context) {
match res {
Ok(bytes) => {
// Convert BytesMut to Bytes
let bytes = bytes.freeze();
println!("{}: echo {} bytes", self.peer_addr, bytes.len());
// Wirte to underlying sink
self.writer.write(bytes).expect("closed sink");
}
Err(err) => {
println!("{}: error: {}", self.peer_addr, err);
}
}
}
}
// Handler for asynchronous errors generated by SinkWrite::write
impl WriteHandler<std::io::Error> for Connection {
fn error(&mut self, err: std::io::Error, _: &mut Self::Context) -> Running {
println!("{}: error: {}", self.peer_addr, err);
Running::Continue
}
}
#[actix::main]
async fn main() {
let addr: SocketAddr = "127.0.0.1:12121".parse().unwrap();
println!("listening on {}", addr);
let listener = TcpListener::bind(addr).await.expect("bind");
// Wait for a client
while let Ok((stream, addr)) = listener.accept().await {
println!("new connection: {}", addr);
// Spawn a dedicated task for the client
actix::spawn(async move { Connection::new(stream) });
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment