Skip to content

Instantly share code, notes, and snippets.

@dustins
Created June 13, 2021 15:13
Show Gist options
  • Save dustins/7baf8342505918571259a9fd0f508800 to your computer and use it in GitHub Desktop.
Save dustins/7baf8342505918571259a9fd0f508800 to your computer and use it in GitHub Desktop.
use async_std::channel;
use async_std::net::{SocketAddr, UdpSocket};
use async_std::sync::Arc;
use async_std::task;
use async_std::task::JoinHandle;
use async_trait::async_trait;
use crate::config::Config;
use crate::error::Result;
use crate::mux::{CHANNEL_CAP, Mux};
use crate::mux::packet::Packet;
use crate::SerialNumber;
use crate::writer::Writer;
pub struct UdpWriter {
config: Arc<Config>,
mux: Mux,
destination: SocketAddr,
}
impl UdpWriter {
pub fn new(config: Arc<Config>, mux: Mux, destination: SocketAddr) -> Self {
Self {
config,
mux,
destination,
}
}
}
#[async_trait]
impl Writer for UdpWriter {
async fn start(&self) -> Result<()> {
let (tx, rx) = channel::bounded(CHANNEL_CAP);
let mux_rx = self.mux.tx_receiver();
task::spawn(async move {
let mut packet_id = SerialNumber(0u8);
while let Ok(frame) = mux_rx.recv().await {
let packet = Packet::new(packet_id, frame);
tx.send(packet).await.unwrap_or_else(|e| {
panic!("Unable to send packet. {}", e);
});
packet_id += 1;
}
});
let tasks = (0..self.config.writer_tasks).map(|_| {
let destination = self.destination.clone();
let rx = rx.clone();
task::spawn(async move {
let socket = UdpSocket::bind("0.0.0.0:0").await
.expect("Unable to create socket.");
socket.connect(&destination).await.unwrap_or_else(|_| {
panic!("Unable to connect to {:?}", destination);
});
while let Ok(packet) = rx.recv().await {
socket.send(&packet.encode()).await.unwrap_or_else(|_| {
panic!("Unable to write frame {:?}", packet);
});
log::debug!("Wrote Packet {{{}}}", packet.id);
}
})
}).collect::<Vec<JoinHandle<()>>>();
futures::future::join_all(tasks).await;
Ok(())
}
}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {75}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {76}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {77}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {78}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {79}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {80}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {81}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {82}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {83}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {84}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {85}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {86}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {87}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {88}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {89}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {90}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {91}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {92}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {93}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {94}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {95}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {96}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {97}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {98}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {100}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {101}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {102}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {103}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {104}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {105}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {106}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {107}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {108}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {109}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {110}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {111}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {112}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {113}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {114}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {115}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {116}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {117}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {118}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {119}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {120}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {121}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {122}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {123}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {124}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {125}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {126}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {127}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {128}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {129}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {130}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {131}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {132}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {133}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {134}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {135}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {136}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {137}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {138}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {139}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {140}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {99}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {141}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {143}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {142}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {144}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {145}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {146}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {147}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {148}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {149}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {150}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {151}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {152}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {153}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {154}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {155}
14:08:57 [DEBUG] (13) create::writer::udp: Wrote Packet {156}
14:08:57 [DEBUG] (11) create::writer::udp: Wrote Packet {157}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment