Skip to content

Instantly share code, notes, and snippets.

@benkay86
Last active February 7, 2021 02:15
Show Gist options
  • Save benkay86/d0261cfff0c15076066efc02f46be2ef to your computer and use it in GitHub Desktop.
Save benkay86/d0261cfff0c15076066efc02f46be2ef to your computer and use it in GitHub Desktop.
Write an output file in comma-separated value (CSV) format asynchronously using Rust and tokio.
//! Module for CSVOutputWriter, which writes a single unit of data to an output
//! sink using the comma separated value format.
use tokio::io::{AsyncWrite, AsyncWriteExt};
use std::ops::Deref;
use tokio::sync::Mutex;
/// Type alias for a type that implementes AsyncWrite and is synchronized by a
/// mutex. For example:
///
/// * `Mutex<tokio::fs::File>`
/// * `Mutex<Vec<u8>>`
///
/// Note that the mutex is [`tokio::sync::Mutex`] rather than `std::sync::Mutex`
/// because we need to hold a lock on the mutex across `await` points.
pub type SynchronizedWrite = Mutex<dyn AsyncWrite + Send + Unpin>;
/// Synchronize access to a comma separated value (CSV) file. Aynchronously
/// writes out lines to the file. Example:
///
/// ```no_run
/// use tokio::sync::Mutex;
/// use tokio::fs::File;
/// use output_writer::SynchronizedWrite;
/// # fn main() -> Result<(), std::error::Error> {
/// let writer: Box<SynchronizedWrite> = Box::new(Mutex::new(File::create("out.csv").await?));
/// let writer = CSVOutputWriter::new(writer).build();
/// let task = tokio::spawn(async move{
/// // Output: value1,value2,value3
/// writer.write(&["value1", "value2", "value3"]).await
/// });
/// task.await??;
/// # Ok(())
/// # }
/// ```
pub struct CSVOutputWriter<T: Deref<Target = SynchronizedWrite>>
{
/// A trait object that implements AsyncWrite and is guarded by a
/// [`tokio::sync::Mutex']. For example:
///
/// * `Box<Mutex<tokio::fs::File>>`
/// * `Arc<Mutex<Vec<u8>>>`
pub writer: T,
/// Delimeter placed around each value, e.g. `"` --> `"val"`.
pub delim: String,
/// Line terminator, written at the end of each `write_line()`, e.g. `"\n"`.
pub newline: String,
/// Separator between values, e.g. `,` --> `val1,val2`.
pub sep: String,
}
impl<T: Deref<Target = SynchronizedWrite>> CSVOutputWriter<T>
{
/// Consumes an iterator `line`, writing all the items to a single line,
/// e.g.
///
/// ```no_run
/// # use tokio::sync::Mutex;
/// # use tokio::fs::File;
/// # use output_writer::SynchronizedWrite;
/// # fn main() -> Result<(), std::error::Error> {
/// let writer: Box<SynchronizedWrite> = Box::new(Mutex::new(File::create("out.csv").await?)).build();
/// let writer = CSVOutputWriter::new(writer).build();
/// writer.write(&["1", "2", "3"]).await?;
/// writer.write(&["a", "b", "c"]).await?;
/// // out.csv contains:
/// // 1,2,3
/// // a,b,c
/// # Ok(())
/// # }
/// ```
pub async fn write_line<U, V>(&self, line: U) -> std::io::Result<()> where
U: IntoIterator<Item = V>,
V: AsRef<str>
{
// Prepare this line of the CSV file for writing.
let line = line.into_iter()
.map(|s| [&self.delim, s.as_ref(), &self.delim].concat())
.collect::<Vec<String>>()
.join(&self.sep)
+ &self.newline;
// Lock mutex for writer.
// Will only panic if the mutex is poisoned.
let mut writer = self.writer.lock().await;
// Write out the line.
writer.write_all(line.as_bytes()).await
// Lock automatically released here.
}
/// Flushes the writer, see [`tokio::io::AsyncWriteExt::flush()`].
pub async fn flush(&self) -> std::io::Result<()> {
// Lock, flush, automatically unlock.
self.writer.lock().await.flush().await
}
}
/// Builder pattern for constructing a CSVOutputWriter, e.g.
///
/// ```no_run
/// use tokio::sync::Mutex;
/// use tokio::fs::File;
/// use output_writer::SynchronizedWrite;
/// # fn main() -> Result<(), std::error::Error> {
/// let writer: Box<SynchronizedWrite> = Box::new(Mutex::new(File::create("out.csv").await?)).build();
/// let writer = CSVOutputWriterBuilder::new(writer).sep(", ").build();
/// # Ok(())
/// # }
/// ```
pub struct CSVOutputWriterBuilder<T: Deref<Target = SynchronizedWrite>>
{
writer_: T,
delim_: String,
newline_: String,
sep_: String,
}
impl<T> std::convert::From<CSVOutputWriterBuilder<T>>
for CSVOutputWriter<T>
where T: Deref<Target = SynchronizedWrite>
{
fn from(builder: CSVOutputWriterBuilder<T>) -> CSVOutputWriter<T> {
CSVOutputWriter {
writer: builder.writer_,
delim: builder.delim_,
newline: builder.newline_,
sep: builder.sep_,
}
}
}
impl<T: Deref<Target = SynchronizedWrite>> CSVOutputWriterBuilder<T> {
/// Creates a `CSVOutputWriterBuilder` from the given `writer` using default
/// options:
///
/// * `delim: ""`
/// * `newline: "\n"`
/// * `sep: ","`
pub fn new(writer: T) -> Self {
CSVOutputWriterBuilder {
writer_: writer, // no default, must provide writer
delim_: "".into(), // default: empty string
newline_: "\n".into(), // default: newline character
sep_: ",".into(), // default: comma
}
}
/// Set the delimiter, default `","`.
pub fn delim<S: Into<String>>(self, delim: S) -> Self {
CSVOutputWriterBuilder {
writer_: self.writer_,
delim_: delim.into(),
newline_: self.newline_,
sep_: self.sep_,
}
}
/// Set the line terminator, default '"\n"`.
pub fn newline<S: Into<String>>(self, newline: S) -> Self {
CSVOutputWriterBuilder {
writer_: self.writer_,
delim_: self.delim_,
newline_: newline.into(),
sep_: self.sep_,
}
}
/// Set the separator, default `","`.
pub fn sep<S: Into<String>>(self, sep: S) -> Self {
CSVOutputWriterBuilder {
writer_: self.writer_,
delim_: self.delim_,
newline_: self.newline_,
sep_: sep.into(),
}
}
/// Convert this builder into a `CSVOutputWriter`.
pub fn build(self) -> CSVOutputWriter<T> {
self.into()
}
}
impl<T: Deref<Target = SynchronizedWrite>> CSVOutputWriter<T> {
/// Synonym for [`CSVOutputWriterBuilder::new()`].
pub fn new(writer: T) -> CSVOutputWriterBuilder<T> {
CSVOutputWriterBuilder::new(writer)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
#[tokio::test]
// Test manual creation of CSVOutputWriter.
async fn test_write() {
let sink = Arc::new(Mutex::new(Vec::<u8>::new()));
{
let writer: CSVOutputWriter<Arc<SynchronizedWrite>> = CSVOutputWriter {
writer: sink.clone() as Arc<SynchronizedWrite>,
delim: "".into(),
newline: "\n".into(),
sep: ", ".into()
};
writer.write_line(&["1", "2", "3"]).await.unwrap();
}
let sink = sink.lock().await;
let output = std::str::from_utf8(&sink).unwrap();
assert!(output == "1, 2, 3\n");
}
#[tokio::test]
// Test flush function.
async fn test_flush() {
let sink = Arc::new(Mutex::new(Vec::<u8>::new()));
let writer: CSVOutputWriter<Arc<SynchronizedWrite>> = CSVOutputWriter {
writer: sink.clone() as Arc<SynchronizedWrite>,
delim: "\"".into(),
newline: "\n".into(),
sep: ",".into()
};
writer.write_line(&["1", "2", "3"]).await.unwrap();
writer.flush().await.unwrap();
let sink = sink.lock().await;
let output = std::str::from_utf8(&sink).unwrap();
assert!(output == "\"1\",\"2\",\"3\"\n");
}
#[tokio::test]
// Test sending writer to another thread.
async fn test_write_threaded() {
let sink = Arc::new(Mutex::new(Vec::<u8>::new()));
{
let writer: Arc<CSVOutputWriter<Arc<SynchronizedWrite>>> = Arc::new(CSVOutputWriter {
writer: sink.clone() as Arc<SynchronizedWrite>,
delim: "".into(),
newline: "\n".into(),
sep: ",".into()
});
// Send writer to another thread/task.
let task: tokio::task::JoinHandle<std::io::Result<()>>;
{
let writer = writer.clone();
task = tokio::spawn(async move {
writer.write_line(&["1", "2", "3"]).await?;
writer.flush().await
});
}
// Wait for task to finish writing.
task.await.unwrap().unwrap();
// Write some more on this thread.
writer.write_line(&["4", "5", "6"]).await.unwrap();
}
let sink = sink.lock().await;
let output = std::str::from_utf8(&sink).unwrap();
assert!(output == "1,2,3\n4,5,6\n");
}
#[tokio::test]
// Test builder pattern.
async fn test_builder() {
let sink = Arc::new(Mutex::new(Vec::<u8>::new()));
{
let sink: Arc<SynchronizedWrite> = sink.clone();
let writer = CSVOutputWriter::new(sink)
.delim("'")
.newline("\n")
.sep("\t")
.build();
writer.write_line(&["1", "2", "3"]).await.unwrap();
}
let sink = sink.lock().await;
let output = std::str::from_utf8(&sink).unwrap();
assert!(output == "'1'\t'2'\t'3'\n");
}
#[tokio::test]
// Test builder pattern with defaults.
async fn test_builder_defaults() {
let sink = Arc::new(Mutex::new(Vec::<u8>::new()));
{
let sink: Arc<SynchronizedWrite> = sink.clone();
let writer = CSVOutputWriter::new(sink).build();
writer.write_line(&["1", "2", "3"]).await.unwrap();
}
let sink = sink.lock().await;
let output = std::str::from_utf8(&sink).unwrap();
assert!(output == "1,2,3\n");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment