Skip to content

Instantly share code, notes, and snippets.

@literadix
Created September 29, 2021 05:45
Show Gist options
  • Save literadix/4831f958b810101ccaaf61bbf3822c10 to your computer and use it in GitHub Desktop.
Save literadix/4831f958b810101ccaaf61bbf3822c10 to your computer and use it in GitHub Desktop.
RUST: Write and read to files from threads, channel communication
--- Cargo.toml ---
[package]
name = "actix-example"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
crossbeam-channel = "^0.5"
serde = { version = "^1.0", features = ["derive"] }
serde_json = "1.0.68"
serde_derive = "1.0.130"
--- main.rs ---
use crossbeam_channel::bounded;
use serde::{Deserialize, Serialize};
use serde_json::Result;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, Write};
use std::path::Path;
use std::time::Duration;
use std::{io, thread};
extern crate serde_json;
#[derive(Serialize, Deserialize, Debug)]
struct Message {
count: u32,
what: String,
}
fn read_lines<P>(filename: P) -> io::Result<io::Lines<io::BufReader<File>>>
where
P: AsRef<Path>,
{
let file = File::open(filename)?;
Ok(io::BufReader::new(file).lines())
}
fn main() {
// Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
// where `T` is the type of the message to be transferred
// (type annotation is superfluous)
let (tx, rx) = bounded(10);
let mut msg_count = 0;
let _sender = thread::spawn(move || loop {
msg_count = msg_count + 1;
if msg_count > 10_000 {
break;
}
tx.send(Message {
count: msg_count,
what: format!("{}", msg_count).to_string(),
})
.unwrap();
});
//let file = File::create("data/log.dat").expect("create failed");
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open("data/log.dat")
.expect("cannot open file");
let _receiver = thread::spawn(move || loop {
match rx.recv_timeout(Duration::from_secs(1)) {
Ok(msg) => {
let serialized = serde_json::to_string(&msg).unwrap();
match writeln!(file, "{}", serialized) {
Ok(_) => {}
Err(e) => {
eprintln!("{:?}", e)
}
}
if msg.count % 1_000 == 0 {
println!("received: {:?}", msg.count)
}
}
Err(_) => {
break;
}
}
});
_sender.join().unwrap();
_receiver.join().unwrap();
if let Ok(lines) = read_lines("data/log.dat") {
// Consumes the iterator, returns an (Optional) String
for line_res in lines {
if let Ok(line) = line_res {
let res: Result<Message> = serde_json::from_str(&line);
match res {
Ok(msg) => {
println!("{:?}", msg)
}
Err(_) => {}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment