Skip to content

Instantly share code, notes, and snippets.

@dgllghr
Last active January 22, 2019 20:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgllghr/2b93d28a124f65d26d5b500a21789151 to your computer and use it in GitHub Desktop.
Save dgllghr/2b93d28a124f65d26d5b500a21789151 to your computer and use it in GitHub Desktop.
Async stream reader using csv-core and tokio
use bytes::Buf;
use std::io;
use tokio::prelude::*;
pub struct ReadStream<B> {
max_record_size: usize,
inbuf: Vec<u8>,
inbuf_offset: usize,
outbuf: Vec<u8>,
outbuf_offset: usize,
endsbuf: Vec<usize>,
endsbuf_offset: usize,
reader: csv_core::Reader,
inner: stream::Fuse<Box<Stream<Item = B, Error = io::Error> + Send>>,
}
impl<B> ReadStream<B>
where
B: Buf,
{
pub fn new(
stream: Box<Stream<Item = B, Error = io::Error> + Send>,
reader: csv_core::Reader,
max_record_size: usize,
) -> Self {
let outbuf = vec![0; 256];
let endsbuf = vec![0; 16];
ReadStream {
max_record_size,
inbuf: Vec::new(),
inbuf_offset: 0,
outbuf,
outbuf_offset: 0,
endsbuf,
endsbuf_offset: 0,
reader: reader,
inner: stream.fuse(),
}
}
fn read_in<T>(&mut self) -> Result<Option<Async<Option<T>>>, io::Error> {
match self.inner.poll()? {
Async::NotReady => {
return Ok(Some(Async::NotReady));
}
Async::Ready(None) => {
// The input stream is exhausted but this stream is not
// complete until the csv_core::Reader is complete
return Ok(None);
}
Async::Ready(Some(buffer)) => {
if self.inbuf.capacity() == 0 {
self.inbuf.reserve_exact(buffer.remaining() * 4);
}
if buffer.remaining() + self.inbuf.len() > self.inbuf.capacity() {
self.truncate_inbuf();
}
self.inbuf.extend_from_slice(buffer.bytes());
return Ok(None);
}
}
}
#[inline]
fn inbuf_empty(&self) -> bool {
self.inbuf.is_empty() || self.inbuf.len() - self.inbuf_offset <= 1
}
#[inline]
fn truncate_inbuf(&mut self) {
self.inbuf_offset = 0;
self.inbuf.truncate(0);
}
#[inline]
fn read_record(&mut self) -> csv_core::ReadRecordResult {
let inbuf = &self.inbuf[self.inbuf_offset..];
let outbuf = &mut self.outbuf[self.outbuf_offset..];
let endsbuf = &mut self.endsbuf[self.endsbuf_offset..];
let (res, r, w, ne) = self.reader.read_record(inbuf, outbuf, endsbuf);
self.inbuf_offset = self.inbuf_offset + r;
self.outbuf_offset = self.outbuf_offset + w;
self.endsbuf_offset = self.endsbuf_offset + ne;
res
}
}
impl<B> Stream for ReadStream<B>
where
B: Buf,
{
type Item = Record;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
loop {
if self.inbuf_empty() {
let read_res = self.read_in()?;
if let Some(res) = read_res {
return Ok(res);
}
}
let res = self.read_record();
match res {
csv_core::ReadRecordResult::InputEmpty => {
let read_res = self.read_in()?;
if let Some(res) = read_res {
return Ok(res);
}
}
csv_core::ReadRecordResult::OutputFull => {
let new_size = std::cmp::min(self.max_record_size, self.outbuf.len() * 2);
if new_size > self.outbuf.len() {
self.outbuf.resize(new_size, 0);
continue;
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"record too large",
));
}
}
csv_core::ReadRecordResult::OutputEndsFull => {
self.endsbuf.resize(self.endsbuf.len() * 2, 0);
}
csv_core::ReadRecordResult::End => {
return Ok(Async::Ready(None));
}
csv_core::ReadRecordResult::Record => {
let csv_rec = Record::new(&self.outbuf, &self.endsbuf, self.endsbuf_offset);
self.outbuf_offset = 0;
self.endsbuf_offset = 0;
return Ok(Async::Ready(Some(csv_rec)));
}
}
}
}
}
pub struct Record {
buf: Vec<u8>,
endsbuf: Vec<usize>,
}
impl Record {
pub fn new(buf: &[u8], endsbuf: &[usize], num_ends: usize) -> Self {
Record {
buf: buf.to_vec(),
endsbuf: (&endsbuf[..num_ends]).to_vec(),
}
}
pub fn iter<'a>(&'a self) -> RecordIter<'a> {
RecordIter {
rec: self,
offset: 0,
end_offset: 0,
}
}
}
pub struct RecordIter<'a> {
rec: &'a Record,
offset: usize,
end_offset: usize,
}
impl<'a> Iterator for RecordIter<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
if self.end_offset >= self.rec.endsbuf.len() {
return None;
}
let start = self.offset;
let end = self.rec.endsbuf[self.end_offset];
self.offset = end;
self.end_offset = self.end_offset + 1;
Some(&self.rec.buf[start..end])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment