Skip to content

Instantly share code, notes, and snippets.

@dignifiedquire
Created April 14, 2020 21:41
Show Gist options
  • Save dignifiedquire/fc98b855b2d470cc5f5d595e0f4e14ca to your computer and use it in GitHub Desktop.
Save dignifiedquire/fc98b855b2d470cc5f5d595e0f4e14ca to your computer and use it in GitHub Desktop.
use async_std::task::Poll;
use byte_pool::{Block, BytePool};
use byteorder::{BigEndian, ByteOrder};
use jetscii::bytes;
use std::io::{Cursor, Result};
use std::pin::Pin;
use std::task::Context;
use subslice::SubsliceExt;
fn main() {
let pool = BytePool::<Vec<u8>>::new();
let source = vec![1; 1024 * 1024];
let incoming_bytes = Cursor::new(&source);
let mut dest = Vec::new();
incoming_bytes
.chunks(&pool, 1024)
.map(|mut chunk| {
// wrap the chunk into something
chunk.insert(0, 0);
chunk.push(255);
chunk
})
.pipe(&mut dest)
.unwrap();
assert_eq!(dest.len(), source.len() + (source.len() / 1024) * 2);
// Parse incoming data to `u32`
let incoming_bytes = Cursor::new(&source);
let u32s = incoming_bytes
.chunks(&pool, 4)
.map(|chunk| BigEndian::read_u32(&chunk))
.collect::<Vec<u32>>();
assert_eq!(u32s[0], u32::from_le_bytes([1, 1, 1, 1]));
}
#[test]
fn test_async() {
use async_std::{io::Cursor, stream::StreamExt, task};
let pool = BytePool::<Vec<u8>>::new();
task::block_on(async move {
let mut incoming_bytes = BlockBuffer::new(
&pool,
Cursor::new(
"7\r\n\
Mozilla\r\n\
9\r\n\
Developer\r\n\
7\r\n\
Network\r\n\
0\r\n\
Expires: Wed, 21 Oct 2015 07:28:00 GMT\r\n\
\r\n"
.as_bytes(),
),
);
let mut result = Vec::new();
loop {
// read the length of the chunk
let hex_block: Block = incoming_bytes.read_until(b"\r\n").await.unwrap();
let len = parse_chunk_size(&hex_block[..]);
if len == 0 {
// we are done with the main parts
break;
}
// read the actual data
let data: Block = incoming_bytes.take(len).await.unwrap();
result.extend_from_slice(&data[..]);
// read the trailing \r\n
let crlf = incoming_bytes.take(2).await.unwrap();
assert_eq!(&crlf[..], "\r\n".as_bytes());
}
// parse trailers
let trailers: Vec<(String, String)> = incoming_bytes
.split_at(b"\r\n") // create chunks splitting at each crlf boundary
.map(|chunk: Result<Block>| {
let chunk = chunk?;
// split into the first occururence of ": "
let i = bytes!(':', ' ').find(&chunk[..]).unwrap();
let (key, value) = chunk.split_at(i);
Ok((
String::from_utf8(key.into()).unwrap(),
String::from_utf8(value[2..].into()).unwrap(),
))
})
.collect::<Result<_>>()
.await
.unwrap();
assert_eq!(result, "MozillaDeveloperNetwork".as_bytes());
assert_eq!(
trailers,
vec![(
"Expires".to_string(),
"Wed, 21 Oct 2015 07:28:00 GMT".to_string()
)]
);
});
}
fn parse_chunk_size(data: &[u8]) -> usize {
let mut data = data.to_vec();
data.extend_from_slice(b"\r\n");
httparse::parse_chunk_size(&data).unwrap().unwrap().1 as usize
}
pub struct BlockBuffer<'a, T> {
pool: &'a BytePool<Vec<u8>>,
source: T,
buffer: Block<'a, Vec<u8>>,
pos: usize,
done: bool,
}
impl<'a, T: async_std::io::Read + Unpin> BlockBuffer<'a, T> {
pub fn new(pool: &'a BytePool<Vec<u8>>, source: T) -> Self {
Self {
pool,
source,
buffer: pool.alloc(1024),
pos: 0,
done: false,
}
}
fn make_room(&mut self) {
let pos = self.pos;
if self.buffer.len() < 1024 || (pos + 5 >= self.buffer.len() && pos * 2 <= 1024 * 1024 * 10)
{
let size = std::cmp::max(1024, pos * 2);
self.buffer.realloc(size);
}
}
pub fn split_at<'b, 'c>(&'b mut self, predicate: &'c [u8]) -> SplitAt<'a, 'b, 'c, T> {
SplitAt {
source: self,
predicate,
done: false,
first: true,
pos: 0,
}
}
pub fn read_until<'b, 'c>(&'b mut self, predicate: &'c [u8]) -> ReadUntil<'a, 'b, 'c, T> {
ReadUntil {
source: self,
predicate,
first: true,
}
}
pub fn take<'b>(&'b mut self, n: usize) -> Take<'a, 'b, T> {
Take { source: self, n }
}
}
pub struct SplitAt<'a, 'b, 'c, T: async_std::io::Read> {
source: &'b mut BlockBuffer<'a, T>,
predicate: &'c [u8],
done: bool,
first: bool,
pos: usize,
}
impl<'a, 'b, 'c, T: async_std::io::Read + Unpin> async_std::stream::Stream
for SplitAt<'a, 'b, 'c, T>
{
type Item = Result<Block<'a, Vec<u8>>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.done || self.source.done {
return Poll::Ready(None);
}
let pos = self.source.pos;
// make sure we have enough room in the buffer
self.source.make_room();
let mut new_buffer = self.source.pool.alloc(1024);
new_buffer.resize(1024, 0);
let mut buffer = std::mem::replace(&mut self.source.buffer, new_buffer);
// fill buffer
let read = if !self.first || self.source.pos == 0 {
match Pin::new(&mut self.source.source).poll_read(cx, &mut buffer[pos..]) {
Poll::Pending => {
// put buffer back
self.source.buffer = buffer;
return Poll::Pending;
}
Poll::Ready(Err(err)) => {
// put buffer back
self.source.buffer = buffer;
self.done = true;
return Poll::Ready(Some(Err(err)));
}
Poll::Ready(Ok(r)) => r,
}
} else {
0
};
if read == 0 && !self.first {
self.done = true;
self.source.done = true;
}
self.first = false;
// buffer filled, use it
if let Some(i) = (&buffer[self.pos..pos + read]).find(self.predicate) {
let i = self.pos + i;
// success, found one
let read_end = read + pos;
let end = i + self.predicate.len();
// copy the the unneeded parts over into our current buffer
let rest_len = read_end - end;
self.source.buffer[..rest_len].copy_from_slice(&buffer[end..read_end]);
self.source.pos = pos + end;
self.pos += end;
// resize buffer to return
buffer.truncate(i);
return Poll::Ready(Some(Ok(buffer)));
}
// nothing found, next round
self.source.pos = pos + read;
self.source.buffer = buffer;
if self.done || self.source.done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
pub struct ReadUntil<'a, 'b, 'c, T: async_std::io::Read> {
source: &'b mut BlockBuffer<'a, T>,
predicate: &'c [u8],
first: bool,
}
impl<'a, 'b, 'c, T: async_std::io::Read + Unpin> async_std::future::Future
for ReadUntil<'a, 'b, 'c, T>
{
type Output = Result<Block<'a, Vec<u8>>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if self.source.done {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"no found",
)));
}
let pos = self.source.pos;
self.source.make_room();
let mut new_buffer = self.source.pool.alloc(1024);
new_buffer.resize(1024, 0);
let mut buffer = std::mem::replace(&mut self.source.buffer, new_buffer);
// fill buffer
let read: usize = if !self.first || self.source.pos == 0 {
match Pin::new(&mut self.source.source).poll_read(cx, &mut buffer[pos..]) {
Poll::Pending => {
// put buffer back
self.source.buffer = buffer;
return Poll::Pending;
}
Poll::Ready(Err(err)) => {
// put buffer back
self.source.buffer = buffer;
return Poll::Ready(Err(err));
}
Poll::Ready(Ok(r)) => r,
}
} else {
0
};
self.first = false;
// buffer filled, use it
if let Some(i) = (&buffer[..pos + read]).find(self.predicate) {
// success, found one
let read_end = read + pos;
let end = i + self.predicate.len();
// copy over the bytes we read too much
let rest_len = read_end - end;
self.source.buffer[..rest_len].copy_from_slice(&buffer[end..read_end]);
self.source.pos = rest_len;
// resize buffer to return
buffer.truncate(i);
return Poll::Ready(Ok(buffer));
}
// nothing found, next round
self.source.buffer = buffer;
self.source.pos = pos + read;
Poll::Pending
}
}
pub struct Take<'a, 'b, T: async_std::io::Read> {
source: &'b mut BlockBuffer<'a, T>,
n: usize,
}
impl<'a, 'b, T: async_std::io::Read + Unpin> async_std::future::Future for Take<'a, 'b, T> {
type Output = Result<Block<'a, Vec<u8>>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if self.source.done {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"no found",
)));
}
let pos = self.source.pos;
// make sure we have enough room in the buffer
self.source.make_room();
let mut new_buffer = self.source.pool.alloc(1024);
new_buffer.resize(1024, 0);
let mut buffer = std::mem::replace(&mut self.source.buffer, new_buffer);
// fill buffer
let read: usize = if self.n > pos {
match Pin::new(&mut self.source.source).poll_read(cx, &mut buffer[pos..]) {
Poll::Pending => {
// put buffer back
self.source.buffer = buffer;
return Poll::Pending;
}
Poll::Ready(Err(err)) => {
// put buffer back
self.source.buffer = buffer;
return Poll::Ready(Err(err));
}
Poll::Ready(Ok(r)) => r,
}
} else {
0
};
// buffer filled, use it
if read + pos >= self.n {
// success, enough
let read_end = pos + read;
let end = self.n;
// copy over the bytes we read too much
let rest_len = read_end - end;
self.source.buffer[..rest_len].copy_from_slice(&buffer[end..read_end]);
self.source.pos = rest_len;
// resize buffer to return
buffer.truncate(end);
return Poll::Ready(Ok(buffer));
}
// nothing found, next round
self.source.buffer = buffer;
self.source.pos = pos + read;
Poll::Pending
}
}
pub trait ChunksExt: std::io::Read + Sized {
fn chunks<'a>(self, pool: &'a BytePool<Vec<u8>>, chunk_size: usize) -> Chunks<'a, Self> {
Chunks::new(pool, self, chunk_size)
}
}
impl<T: std::io::Read> ChunksExt for T {}
pub trait PipeExt<'a>: Iterator<Item = Block<'a, Vec<u8>>> + Sized {
fn pipe<T: std::io::Write>(mut self, mut dest: T) -> Result<()> {
while let Some(val) = self.next() {
dest.write_all(&val)?;
}
Ok(())
}
}
impl<'a, T: Iterator<Item = Block<'a, Vec<u8>>>> PipeExt<'a> for T {}
#[derive(Debug)]
pub struct Chunks<'a, T: std::io::Read> {
pool: &'a BytePool<Vec<u8>>,
source: T,
chunk_size: usize,
current: Option<(Block<'a, Vec<u8>>, usize)>,
done: bool,
}
impl<'a, T: std::io::Read> Chunks<'a, T> {
pub fn new(pool: &'a BytePool<Vec<u8>>, source: T, chunk_size: usize) -> Self {
Self {
pool,
source,
chunk_size,
current: None,
done: false,
}
}
}
impl<'a, T: std::io::Read> Iterator for Chunks<'a, T> {
type Item = Block<'a, Vec<u8>>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
let mut buf = self.pool.alloc(self.chunk_size);
buf.truncate(self.chunk_size);
let mut pos = 0;
while pos < self.chunk_size {
let read = self.source.read(&mut buf[pos..]).unwrap(); // TODO: handle error
if read == 0 {
self.done = true;
break;
}
pos += read;
}
if pos == 0 {
return None;
}
if pos + 1 < buf.len() {
// shrink buffer
buf.realloc(pos + 1);
}
Some(buf)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment