Skip to content

Instantly share code, notes, and snippets.

@Ironlenny
Created April 29, 2019 07:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Ironlenny/b9c8e1dc13b21e048146d446e2e6de8e to your computer and use it in GitHub Desktop.
Save Ironlenny/b9c8e1dc13b21e048146d446e2e6de8e to your computer and use it in GitHub Desktop.
Touble Calling `Arc::new()` In Rayon Thread
// This is an implementation of
// [Parity Volume Set Specification 2.0](
// http://parchive.sourceforge.net/docs/specifications/parity-volume-spec/article-spec.html)
#![allow(dead_code, unused_must_use)]
use crc32fast::Hasher;
use exitfailure::ExitFailure;
use failure::ResultExt;
use itertools::Itertools;
use md5::{Digest, Md5};
use rayon::{join, spawn};
use std::fs::metadata;
use std::fs::File;
use std::io::prelude::*;
use std::mem::drop;
use std::path::PathBuf;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, RwLock};
// Type trait for sending body types to header channel
trait Type {
fn is_type(&self) -> Box<&[u8; 16]>;
}
// Hashs are 16 bytes (&[u8; 16])
// Header for all packets
struct Header<'a, T> {
magic: Arc<&'a [u8; 8]>, // ASCII string
length: u64, // Packet length starting at first byte of rec_set_id. 8 bytes
pkt_hash: [u8; 16], // 16 bytes
rec_set_id: Arc<Option<[u8; 16]>>, // 16 bytes
pkt_type: [u8; 16], // ASCII string
body: T,
}
// Main packet body
struct Main {
block_size: u64,
num_files: u32, // 4 bytes
file_ids: Box<Vec<[u8; 16]>>,
rec_file_ids: Box<Vec<[u8; 16]>>,
}
impl Type for Main {
fn is_type(&self) -> Box<&[u8; 16]> {
let string: Box<&[u8; 16]> = Box::new(b"PAR 2.0\0Main\0\0\0\0");
string
}
}
// File Description packet body
struct FileDescription<'a> {
file_id: Arc<[u8; 16]>,
hash: [u8; 16],
hash_16k: [u8; 16], // Hash of first 16k of file
length: u64, // Length of file
name: &'a str, // ASCII string
}
impl<'a> Type for FileDescription<'a> {
fn is_type(&self) -> Box<&[u8; 16]> {
let string: Box<&[u8; 16]> = Box::new(b"PAR 2.0\0FileDesc");
string
}
}
// Input File Block Checksum packet body
struct Input {
file_id: Arc<[u8; 16]>,
block_checksums: Vec<([u8; 16], u32)>, // Hash and CRC32 tuple
}
impl Type for Input {
fn is_type(&self) -> Box<&[u8; 16]> {
let string: Box<&[u8; 16]> = Box::new(b"par 2.0\0ifsc\0\0\0\0");
string
}
}
// Recovery Block packet body
struct Recovery {
exponent: u32,
blocks: Vec<u32>,
}
impl Type for Recovery {
fn is_type(&self) -> Box<&[u8; 16]> {
let string: Box<&[u8; 16]> = Box::new(b"PAR 2.0\0RecvSlic");
string
}
}
// Creator packet body
struct Creator {
id: [u8; 16], // ASCII string
}
impl Type for Creator {
fn is_type(&self) -> Box<&[u8; 16]> {
let string: Box<&[u8; 16]> = Box::new(b"PAR 2.0\0Creator\0");
string
}
}
enum Body<'a> {
Main(Main),
FileDescription(FileDescription<'a>),
Input(Input),
Recovery(Recovery),
Creator(Creator),
}
// Block Convenience struct. The spec references slices. Slices and blocks are
// the same thing. A block is an array of 16-bit values
struct Block {
file_id: Arc<[u8; 16]>,
index: usize,
data: Arc<Vec<u8>>,
vec_length: usize,
}
// File creation pipeline
pub struct CreatorPipeline<A, B, C, D, E, F> {
magic: Arc<&'static [u8; 8]>,
rec_set_id: Arc<RwLock<Option<&'static [u8; 16]>>>,
writes: (Sender<A>, Receiver<A>), // A: Header<Body>
main: (Sender<B>, Receiver<B>), // B: Arc<[u8; 16]>
input: (Sender<C>, Receiver<C>), // C: (Arc<[u8; 16]>, File)
body: (Sender<F>, Receiver<F>), // A: Body<'a>
file_description: (Sender<D>, Receiver<D>), // D: (Arc<[u8; 16]>, u64, Vec<u8>)
recovery: (Sender<E>, Receiver<E>), // E: (usize, &[u8])
}
// Creation pipeline methods
impl<'a>
CreatorPipeline<
Header<'a, Body<'a>>, // writes Packet
Arc<[u8; 16]>, // main file_id
(Arc<[u8; 16]>, PathBuf, u64), // input (file_id, file, length)
(Vec<u8>, u64, Vec<u8>), // file_description (name, length, hash_16k)
Block, // recovery Block
Body<'a>, // packet Body
>
{
pub fn new() -> Self {
CreatorPipeline {
magic: Arc::new(b"PAR2\0PKT"),
rec_set_id: Arc::new(RwLock::new(None)),
writes: channel(),
main: channel(),
input: channel(),
body: channel(),
file_description: channel(),
recovery: channel(),
}
}
fn convert_to_byte_array(&self, mut vec: Vec<u8>) -> [u8; 16] {
let mut temp: [u8; 16] = [0; 16];
for i in 0..15 {
temp[i] = vec.pop().unwrap();
}
temp
}
// First Stage: Create file ids and partial bodies for FileDescription. Send
// file ids, partial bodies and file readers to the correct channels.
fn create_file_id(&self, files: Vec<PathBuf>) -> Result<(), ExitFailure> {
let (tx_main, _) = &self.main; // sender for create_main()
let (tx_input, _) = &self.input; // sender for create_input()
let (tx_fd, _) = &self.file_description; // sender for create_fd()
for file in files {
let tx_main = tx_main.clone();
let tx_input = tx_input.clone();
let tx_fd = tx_fd.clone();
// Spawn thread
spawn(move || {
let mut reader = File::open(&file)
.with_context(|_| format!("Could not open file {}", file.display()))
.unwrap();
// Get filename from path
let name = file
.file_stem()
.unwrap()
.to_string_lossy()
.into_owned()
.into_bytes();
let length = {
let metadata = metadata(&file).unwrap();
metadata.len()
};
// Hash first 16k of the file
let hash_16k = {
let mut hasher_16k = Md5::new();
let mut buffer = [0; 16384];
reader.read(&mut buffer).unwrap();
for byte in buffer.iter() {
hasher_16k.input([byte.clone()]);
}
let result = hasher_16k.result();
let hash_16k = result.as_slice().to_owned();
hash_16k
};
// Generate File ID
let file_id = {
let mut hasher_file_id = Md5::new();
hasher_file_id.input(&hash_16k);
hasher_file_id.input(&length.to_le_bytes());
hasher_file_id.input(&name);
let file_id = hasher_file_id.result().to_vec();
let file_id = self.convert_to_byte_array(file_id);
Arc::new(file_id)
};
// Partial FileDescription (name, length, hash_16k)
let partial_body = (name, length, hash_16k);
// sender for channels
tx_main.send(Arc::clone(&file_id)).unwrap();
tx_input.send((Arc::clone(&file_id), file, length)).unwrap();
tx_fd.send(partial_body).unwrap();
});
}
Ok(())
}
// Second Stage: Take file ids file readers, and block_size; and create an input body
// containing file id and block checksums. Iterate through the file reader calculating
// block hashs. Put hashs in a Vec. Send complete body to create_packet(). Bock size is bytes
fn create_input_body(&self, block_size: usize) {
let (_, rx) = &self.input; // Receive file ids
let (tx_body, _) = &self.body; // Send input bodies to packet
let (tx_rec, _) = &self.recovery; // Send blocks to recovery
let blocks_channel: (
Sender<(usize, Vec<u8>, u32)>,
Receiver<(usize, Vec<u8>, u32)>,
);
blocks_channel = channel();
let (tx_block, rx_block) = blocks_channel;
for received in rx.recv() {
let (file_id, file, length) = received;
let body = Body::Input(Input {
file_id: Arc::clone(&file_id),
block_checksums: {
let reader = File::open(file).unwrap();
// Pre-allocate block_checksums vector to eliminate the need for sorting
let num_blocks: usize = length as usize / (16 * block_size);
let mut block_checksums: Vec<([u8; 16], u32)> = vec![([0; 16], 0); num_blocks];
// Iterate through file a byte at a time and collect
// block_size bytes as chunks. Innumerate each chunk
for (i, chunk) in reader.bytes().chunks(block_size).into_iter().enumerate() {
let block: Arc<Vec<u8>> = Arc::new(chunk.map(|x| x.unwrap()).collect());
tx_rec.send(Block {
file_id: Arc::clone(&file_id),
index: i.clone(),
data: Arc::clone(&block),
vec_length: block_size,
});
let tx_block = tx_block.clone();
let mut md5_sum = Vec::new();
let mut crc_sum = 0;
spawn(move || {
let md5_block = Arc::clone(&block);
let crc_block = Arc::clone(&block);
join(
|| {
let block = md5_block;
let mut hasher_md5 = Md5::new();
hasher_md5.input(&*block);
md5_sum = hasher_md5.result().to_vec();
},
|| {
let block = crc_block;
let mut hasher_crc32 = Hasher::new();
hasher_crc32.update(&*block);
crc_sum = hasher_crc32.finalize();
},
);
let result = (i, md5_sum, crc_sum);
tx_block.send(result);
});
drop(tx_block);
}
for block in rx_block {
let (index, md5, crc) = block;
block_checksums[index] = (self.convert_to_byte_array(md5), crc);
}
block_checksums
},
});
tx_body.send(body).unwrap();
}
}
fn create_creator(&self) {
let (tx_body, _) = &self.body;
let body = Body::Creator(Creator {
id: b"Rust\0Parity\0Tool".clone(),
});
tx_body.send(body).unwrap();
}
// fn create_rec_body(&self, parity: usize) {
// let (_, rx) = &self.recovery;
// let (tx_packet, _) = &self.packet;
// for received in rx.into_iter() {
// let (index, block, length) = received;
// }
// }
// fn create_packet(&self, pkt_type: Type, body: T,) -> Header<T> {
// let header = Header {
// magic: &self.magic,
// rec_set_id: &self.rec_set_id,
// length: { 64 + size_of(body) }, // 64 is the size of header in bytes
// pkt_hash:
// }
// }
}
#[cfg(test)]
mod test {
use super::*;
use crc32fast::Hasher;
use hex_literal::hex;
use md5::{Digest, Md5};
use rayon::join;
use std::fs::File;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use typename::TypeName;
static PATH: &str = "/home/jacob/projects/rupart/test_file";
#[test]
fn pipeline_creation() {
let pipeline = CreatorPipeline::new();
}
#[test]
fn creator_body_creation() {
let pipeline = CreatorPipeline::new();
let (_, rx_body) = &pipeline.body;
pipeline.create_creator();
// Test body channel
let result = match rx_body.recv().unwrap() {
Body::Creator(creator) => creator,
_ => panic!("Received something other than a creator body"),
};
// Check that id hasn't changed
assert_eq!(b"Rust\0Parity\0Tool", &result.id);
// Check that type hasn't changed
assert_eq!(b"PAR 2.0\0Creator\0", *result.is_type());
}
#[test]
fn file_id_creation() {
let mut path_vec = Vec::new();
let path = PathBuf::from(&PATH);
let pipeline = CreatorPipeline::new();
let (_, rx_input) = &pipeline.input;
let (_, rx_main) = &pipeline.main;
let (_, rx_fd) = &pipeline.file_description;
for _ in 0..1 {
path_vec.push(path.to_owned());
}
pipeline.create_file_id(path_vec);
// Test input channel
for received in rx_input {
// Test received
assert_eq!(
received.type_name_of(),
"(std::sync::Arc<[u8; 16]>, std::path::PathBuf, u64)",
"received type is wrong"
);
// Test file open
assert!(
{
match File::open(&received.1) {
Ok(_) => true,
Err(_) => false,
}
},
"Cannot open file"
);
// Test file_id hash
assert_eq!(
received.0[..],
hex!("7a9b4bacc05e6c2eb59f25c687f900c4"), // file id hash
"File ID hash is wrong"
);
// Test length
assert_eq!(1048576, received.2);
}
// Test main channel
for received in rx_main {
// Test file_id hash
assert_eq!(
received,
hex!("7a9b4bacc05e6c2eb59f25c687f900c4"), // file id hash
"File ID hash is wrong"
);
// println!("{:?}", received);
}
// Test file description channel
for received in rx_fd {
// Test partial body
assert_eq!(
received.type_name_of(),
"(std::vec::Vec<u8>, u64, std::vec::Vec<u8>)",
"partial body is wrong type"
);
// Test name
assert_eq!(*b"test_file", received.0[..], "File name is wrong");
// Test first 16k hash
assert_eq!(
hex!("54e39774f15c24a19b8553e3a2408af1"),
received.2[..],
"Hash of first 16k is wrong"
);
// Test length
assert_eq!(1048576, received.1, "length is wrong");
}
}
#[test]
fn input_body_creation() {
// Test setup
let block_size = 65536;
let mut path_vec = Vec::new();
let path = PathBuf::from(&PATH);
let pipeline = CreatorPipeline::new();
let (_, rx_recovery) = &pipeline.recovery;
let (_, rx_body) = &pipeline.body;
let mut hashs_md5 = Vec::new();
let mut hashs_crc = Vec::new();
let reader = File::open(&path).unwrap();
for chunk in reader.bytes().chunks(block_size).into_iter() {
let block: Vec<u8> = chunk.map(|x| x.unwrap()).collect();
join(
|| {
let hash = Md5::digest(&block.clone());
let hash = hash.as_slice().to_vec();
&hashs_md5.push(hash);
},
|| {
let mut hasher = Hasher::new();
hasher.update(&block.clone());
&hashs_crc.push(hasher.finalize());
},
);
}
let mut hashs = hashs_md5.into_iter().zip(hashs_crc.into_iter());
for _ in 0..1 {
path_vec.push(path.to_owned());
}
pipeline.create_file_id(path_vec);
pipeline.create_input_body(block_size);
// End test setup
// Test body channel
for received in rx_body.recv() {
let input = match &received {
Body::Input(input) => input,
_ => panic!("Got something other than a input body"),
};
// Test body type
assert_eq!(
*input.is_type(),
b"par 2.0\0ifsc\0\0\0\0",
"Wrong body type"
);
// Test file_id
assert_eq!(
input.file_id,
hex!("7a9b4bacc05e6c2eb59f25c687f900c4"),
"Wrong file id"
);
// Test hashs
for hash in &input.block_checksums {
let (md5, crc) = hashs.next().unwrap();
assert_eq!(
hash,
&(pipeline.convert_to_byte_array(md5), crc),
"input hashes don't match"
);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment