Created
April 29, 2019 07:03
-
-
Save Ironlenny/b9c8e1dc13b21e048146d446e2e6de8e to your computer and use it in GitHub Desktop.
Touble Calling `Arc::new()` In Rayon Thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is an implementation of | |
// [Parity Volume Set Specification 2.0]( | |
// http://parchive.sourceforge.net/docs/specifications/parity-volume-spec/article-spec.html) | |
#![allow(dead_code, unused_must_use)] | |
use crc32fast::Hasher; | |
use exitfailure::ExitFailure; | |
use failure::ResultExt; | |
use itertools::Itertools; | |
use md5::{Digest, Md5}; | |
use rayon::{join, spawn}; | |
use std::fs::metadata; | |
use std::fs::File; | |
use std::io::prelude::*; | |
use std::mem::drop; | |
use std::path::PathBuf; | |
use std::sync::mpsc::{channel, Receiver, Sender}; | |
use std::sync::{Arc, RwLock}; | |
// Type trait for sending body types to header channel | |
trait Type { | |
fn is_type(&self) -> Box<&[u8; 16]>; | |
} | |
// Hashs are 16 bytes (&[u8; 16]) | |
// Header for all packets | |
struct Header<'a, T> { | |
magic: Arc<&'a [u8; 8]>, // ASCII string | |
length: u64, // Packet length starting at first byte of rec_set_id. 8 bytes | |
pkt_hash: [u8; 16], // 16 bytes | |
rec_set_id: Arc<Option<[u8; 16]>>, // 16 bytes | |
pkt_type: [u8; 16], // ASCII string | |
body: T, | |
} | |
// Main packet body | |
struct Main { | |
block_size: u64, | |
num_files: u32, // 4 bytes | |
file_ids: Box<Vec<[u8; 16]>>, | |
rec_file_ids: Box<Vec<[u8; 16]>>, | |
} | |
impl Type for Main { | |
fn is_type(&self) -> Box<&[u8; 16]> { | |
let string: Box<&[u8; 16]> = Box::new(b"PAR 2.0\0Main\0\0\0\0"); | |
string | |
} | |
} | |
// File Description packet body | |
struct FileDescription<'a> { | |
file_id: Arc<[u8; 16]>, | |
hash: [u8; 16], | |
hash_16k: [u8; 16], // Hash of first 16k of file | |
length: u64, // Length of file | |
name: &'a str, // ASCII string | |
} | |
impl<'a> Type for FileDescription<'a> { | |
fn is_type(&self) -> Box<&[u8; 16]> { | |
let string: Box<&[u8; 16]> = Box::new(b"PAR 2.0\0FileDesc"); | |
string | |
} | |
} | |
// Input File Block Checksum packet body | |
struct Input { | |
file_id: Arc<[u8; 16]>, | |
block_checksums: Vec<([u8; 16], u32)>, // Hash and CRC32 tuple | |
} | |
impl Type for Input { | |
fn is_type(&self) -> Box<&[u8; 16]> { | |
let string: Box<&[u8; 16]> = Box::new(b"par 2.0\0ifsc\0\0\0\0"); | |
string | |
} | |
} | |
// Recovery Block packet body | |
struct Recovery { | |
exponent: u32, | |
blocks: Vec<u32>, | |
} | |
impl Type for Recovery { | |
fn is_type(&self) -> Box<&[u8; 16]> { | |
let string: Box<&[u8; 16]> = Box::new(b"PAR 2.0\0RecvSlic"); | |
string | |
} | |
} | |
// Creator packet body | |
struct Creator { | |
id: [u8; 16], // ASCII string | |
} | |
impl Type for Creator { | |
fn is_type(&self) -> Box<&[u8; 16]> { | |
let string: Box<&[u8; 16]> = Box::new(b"PAR 2.0\0Creator\0"); | |
string | |
} | |
} | |
enum Body<'a> { | |
Main(Main), | |
FileDescription(FileDescription<'a>), | |
Input(Input), | |
Recovery(Recovery), | |
Creator(Creator), | |
} | |
// Block Convenience struct. The spec references slices. Slices and blocks are | |
// the same thing. A block is an array of 16-bit values | |
struct Block { | |
file_id: Arc<[u8; 16]>, | |
index: usize, | |
data: Arc<Vec<u8>>, | |
vec_length: usize, | |
} | |
// File creation pipeline | |
pub struct CreatorPipeline<A, B, C, D, E, F> { | |
magic: Arc<&'static [u8; 8]>, | |
rec_set_id: Arc<RwLock<Option<&'static [u8; 16]>>>, | |
writes: (Sender<A>, Receiver<A>), // A: Header<Body> | |
main: (Sender<B>, Receiver<B>), // B: Arc<[u8; 16]> | |
input: (Sender<C>, Receiver<C>), // C: (Arc<[u8; 16]>, File) | |
body: (Sender<F>, Receiver<F>), // A: Body<'a> | |
file_description: (Sender<D>, Receiver<D>), // D: (Arc<[u8; 16]>, u64, Vec<u8>) | |
recovery: (Sender<E>, Receiver<E>), // E: (usize, &[u8]) | |
} | |
// Creation pipeline methods | |
impl<'a> | |
CreatorPipeline< | |
Header<'a, Body<'a>>, // writes Packet | |
Arc<[u8; 16]>, // main file_id | |
(Arc<[u8; 16]>, PathBuf, u64), // input (file_id, file, length) | |
(Vec<u8>, u64, Vec<u8>), // file_description (name, length, hash_16k) | |
Block, // recovery Block | |
Body<'a>, // packet Body | |
> | |
{ | |
pub fn new() -> Self { | |
CreatorPipeline { | |
magic: Arc::new(b"PAR2\0PKT"), | |
rec_set_id: Arc::new(RwLock::new(None)), | |
writes: channel(), | |
main: channel(), | |
input: channel(), | |
body: channel(), | |
file_description: channel(), | |
recovery: channel(), | |
} | |
} | |
fn convert_to_byte_array(&self, mut vec: Vec<u8>) -> [u8; 16] { | |
let mut temp: [u8; 16] = [0; 16]; | |
for i in 0..15 { | |
temp[i] = vec.pop().unwrap(); | |
} | |
temp | |
} | |
// First Stage: Create file ids and partial bodies for FileDescription. Send | |
// file ids, partial bodies and file readers to the correct channels. | |
fn create_file_id(&self, files: Vec<PathBuf>) -> Result<(), ExitFailure> { | |
let (tx_main, _) = &self.main; // sender for create_main() | |
let (tx_input, _) = &self.input; // sender for create_input() | |
let (tx_fd, _) = &self.file_description; // sender for create_fd() | |
for file in files { | |
let tx_main = tx_main.clone(); | |
let tx_input = tx_input.clone(); | |
let tx_fd = tx_fd.clone(); | |
// Spawn thread | |
spawn(move || { | |
let mut reader = File::open(&file) | |
.with_context(|_| format!("Could not open file {}", file.display())) | |
.unwrap(); | |
// Get filename from path | |
let name = file | |
.file_stem() | |
.unwrap() | |
.to_string_lossy() | |
.into_owned() | |
.into_bytes(); | |
let length = { | |
let metadata = metadata(&file).unwrap(); | |
metadata.len() | |
}; | |
// Hash first 16k of the file | |
let hash_16k = { | |
let mut hasher_16k = Md5::new(); | |
let mut buffer = [0; 16384]; | |
reader.read(&mut buffer).unwrap(); | |
for byte in buffer.iter() { | |
hasher_16k.input([byte.clone()]); | |
} | |
let result = hasher_16k.result(); | |
let hash_16k = result.as_slice().to_owned(); | |
hash_16k | |
}; | |
// Generate File ID | |
let file_id = { | |
let mut hasher_file_id = Md5::new(); | |
hasher_file_id.input(&hash_16k); | |
hasher_file_id.input(&length.to_le_bytes()); | |
hasher_file_id.input(&name); | |
let file_id = hasher_file_id.result().to_vec(); | |
let file_id = self.convert_to_byte_array(file_id); | |
Arc::new(file_id) | |
}; | |
// Partial FileDescription (name, length, hash_16k) | |
let partial_body = (name, length, hash_16k); | |
// sender for channels | |
tx_main.send(Arc::clone(&file_id)).unwrap(); | |
tx_input.send((Arc::clone(&file_id), file, length)).unwrap(); | |
tx_fd.send(partial_body).unwrap(); | |
}); | |
} | |
Ok(()) | |
} | |
// Second Stage: Take file ids file readers, and block_size; and create an input body | |
// containing file id and block checksums. Iterate through the file reader calculating | |
// block hashs. Put hashs in a Vec. Send complete body to create_packet(). Bock size is bytes | |
fn create_input_body(&self, block_size: usize) { | |
let (_, rx) = &self.input; // Receive file ids | |
let (tx_body, _) = &self.body; // Send input bodies to packet | |
let (tx_rec, _) = &self.recovery; // Send blocks to recovery | |
let blocks_channel: ( | |
Sender<(usize, Vec<u8>, u32)>, | |
Receiver<(usize, Vec<u8>, u32)>, | |
); | |
blocks_channel = channel(); | |
let (tx_block, rx_block) = blocks_channel; | |
for received in rx.recv() { | |
let (file_id, file, length) = received; | |
let body = Body::Input(Input { | |
file_id: Arc::clone(&file_id), | |
block_checksums: { | |
let reader = File::open(file).unwrap(); | |
// Pre-allocate block_checksums vector to eliminate the need for sorting | |
let num_blocks: usize = length as usize / (16 * block_size); | |
let mut block_checksums: Vec<([u8; 16], u32)> = vec![([0; 16], 0); num_blocks]; | |
// Iterate through file a byte at a time and collect | |
// block_size bytes as chunks. Innumerate each chunk | |
for (i, chunk) in reader.bytes().chunks(block_size).into_iter().enumerate() { | |
let block: Arc<Vec<u8>> = Arc::new(chunk.map(|x| x.unwrap()).collect()); | |
tx_rec.send(Block { | |
file_id: Arc::clone(&file_id), | |
index: i.clone(), | |
data: Arc::clone(&block), | |
vec_length: block_size, | |
}); | |
let tx_block = tx_block.clone(); | |
let mut md5_sum = Vec::new(); | |
let mut crc_sum = 0; | |
spawn(move || { | |
let md5_block = Arc::clone(&block); | |
let crc_block = Arc::clone(&block); | |
join( | |
|| { | |
let block = md5_block; | |
let mut hasher_md5 = Md5::new(); | |
hasher_md5.input(&*block); | |
md5_sum = hasher_md5.result().to_vec(); | |
}, | |
|| { | |
let block = crc_block; | |
let mut hasher_crc32 = Hasher::new(); | |
hasher_crc32.update(&*block); | |
crc_sum = hasher_crc32.finalize(); | |
}, | |
); | |
let result = (i, md5_sum, crc_sum); | |
tx_block.send(result); | |
}); | |
drop(tx_block); | |
} | |
for block in rx_block { | |
let (index, md5, crc) = block; | |
block_checksums[index] = (self.convert_to_byte_array(md5), crc); | |
} | |
block_checksums | |
}, | |
}); | |
tx_body.send(body).unwrap(); | |
} | |
} | |
fn create_creator(&self) { | |
let (tx_body, _) = &self.body; | |
let body = Body::Creator(Creator { | |
id: b"Rust\0Parity\0Tool".clone(), | |
}); | |
tx_body.send(body).unwrap(); | |
} | |
// fn create_rec_body(&self, parity: usize) { | |
// let (_, rx) = &self.recovery; | |
// let (tx_packet, _) = &self.packet; | |
// for received in rx.into_iter() { | |
// let (index, block, length) = received; | |
// } | |
// } | |
// fn create_packet(&self, pkt_type: Type, body: T,) -> Header<T> { | |
// let header = Header { | |
// magic: &self.magic, | |
// rec_set_id: &self.rec_set_id, | |
// length: { 64 + size_of(body) }, // 64 is the size of header in bytes | |
// pkt_hash: | |
// } | |
// } | |
} | |
#[cfg(test)] | |
mod test { | |
use super::*; | |
use crc32fast::Hasher; | |
use hex_literal::hex; | |
use md5::{Digest, Md5}; | |
use rayon::join; | |
use std::fs::File; | |
use std::io::prelude::*; | |
use std::path::{Path, PathBuf}; | |
use typename::TypeName; | |
static PATH: &str = "/home/jacob/projects/rupart/test_file"; | |
#[test] | |
fn pipeline_creation() { | |
let pipeline = CreatorPipeline::new(); | |
} | |
#[test] | |
fn creator_body_creation() { | |
let pipeline = CreatorPipeline::new(); | |
let (_, rx_body) = &pipeline.body; | |
pipeline.create_creator(); | |
// Test body channel | |
let result = match rx_body.recv().unwrap() { | |
Body::Creator(creator) => creator, | |
_ => panic!("Received something other than a creator body"), | |
}; | |
// Check that id hasn't changed | |
assert_eq!(b"Rust\0Parity\0Tool", &result.id); | |
// Check that type hasn't changed | |
assert_eq!(b"PAR 2.0\0Creator\0", *result.is_type()); | |
} | |
#[test] | |
fn file_id_creation() { | |
let mut path_vec = Vec::new(); | |
let path = PathBuf::from(&PATH); | |
let pipeline = CreatorPipeline::new(); | |
let (_, rx_input) = &pipeline.input; | |
let (_, rx_main) = &pipeline.main; | |
let (_, rx_fd) = &pipeline.file_description; | |
for _ in 0..1 { | |
path_vec.push(path.to_owned()); | |
} | |
pipeline.create_file_id(path_vec); | |
// Test input channel | |
for received in rx_input { | |
// Test received | |
assert_eq!( | |
received.type_name_of(), | |
"(std::sync::Arc<[u8; 16]>, std::path::PathBuf, u64)", | |
"received type is wrong" | |
); | |
// Test file open | |
assert!( | |
{ | |
match File::open(&received.1) { | |
Ok(_) => true, | |
Err(_) => false, | |
} | |
}, | |
"Cannot open file" | |
); | |
// Test file_id hash | |
assert_eq!( | |
received.0[..], | |
hex!("7a9b4bacc05e6c2eb59f25c687f900c4"), // file id hash | |
"File ID hash is wrong" | |
); | |
// Test length | |
assert_eq!(1048576, received.2); | |
} | |
// Test main channel | |
for received in rx_main { | |
// Test file_id hash | |
assert_eq!( | |
received, | |
hex!("7a9b4bacc05e6c2eb59f25c687f900c4"), // file id hash | |
"File ID hash is wrong" | |
); | |
// println!("{:?}", received); | |
} | |
// Test file description channel | |
for received in rx_fd { | |
// Test partial body | |
assert_eq!( | |
received.type_name_of(), | |
"(std::vec::Vec<u8>, u64, std::vec::Vec<u8>)", | |
"partial body is wrong type" | |
); | |
// Test name | |
assert_eq!(*b"test_file", received.0[..], "File name is wrong"); | |
// Test first 16k hash | |
assert_eq!( | |
hex!("54e39774f15c24a19b8553e3a2408af1"), | |
received.2[..], | |
"Hash of first 16k is wrong" | |
); | |
// Test length | |
assert_eq!(1048576, received.1, "length is wrong"); | |
} | |
} | |
#[test] | |
fn input_body_creation() { | |
// Test setup | |
let block_size = 65536; | |
let mut path_vec = Vec::new(); | |
let path = PathBuf::from(&PATH); | |
let pipeline = CreatorPipeline::new(); | |
let (_, rx_recovery) = &pipeline.recovery; | |
let (_, rx_body) = &pipeline.body; | |
let mut hashs_md5 = Vec::new(); | |
let mut hashs_crc = Vec::new(); | |
let reader = File::open(&path).unwrap(); | |
for chunk in reader.bytes().chunks(block_size).into_iter() { | |
let block: Vec<u8> = chunk.map(|x| x.unwrap()).collect(); | |
join( | |
|| { | |
let hash = Md5::digest(&block.clone()); | |
let hash = hash.as_slice().to_vec(); | |
&hashs_md5.push(hash); | |
}, | |
|| { | |
let mut hasher = Hasher::new(); | |
hasher.update(&block.clone()); | |
&hashs_crc.push(hasher.finalize()); | |
}, | |
); | |
} | |
let mut hashs = hashs_md5.into_iter().zip(hashs_crc.into_iter()); | |
for _ in 0..1 { | |
path_vec.push(path.to_owned()); | |
} | |
pipeline.create_file_id(path_vec); | |
pipeline.create_input_body(block_size); | |
// End test setup | |
// Test body channel | |
for received in rx_body.recv() { | |
let input = match &received { | |
Body::Input(input) => input, | |
_ => panic!("Got something other than a input body"), | |
}; | |
// Test body type | |
assert_eq!( | |
*input.is_type(), | |
b"par 2.0\0ifsc\0\0\0\0", | |
"Wrong body type" | |
); | |
// Test file_id | |
assert_eq!( | |
input.file_id, | |
hex!("7a9b4bacc05e6c2eb59f25c687f900c4"), | |
"Wrong file id" | |
); | |
// Test hashs | |
for hash in &input.block_checksums { | |
let (md5, crc) = hashs.next().unwrap(); | |
assert_eq!( | |
hash, | |
&(pipeline.convert_to_byte_array(md5), crc), | |
"input hashes don't match" | |
); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment