Skip to content

Instantly share code, notes, and snippets.

@target-san
Created October 16, 2017 06:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save target-san/509a50ce1a42aee519c20e0a6a736165 to your computer and use it in GitHub Desktop.
Save target-san/509a50ce1a42aee519c20e0a6a736165 to your computer and use it in GitHub Desktop.

Execute cargo new --bin httpdl

Cargo.toml

[package]
name    = "httpdl"
version = "0.1.0"
authors = ["Igor Baidiuk <ibaidiuk@amcbridge.com>"]

[dependencies]

main.rs

fn main() {
    println!("Hello, world!");
}

Cargo.toml

[package]
name        = "httpdl"
version     = "0.1.0"
authors     = ["Igor Baidiuk <ibaidiuk@amcbridge.com>"]
description = "Simplistic CLI batch file downloader over HTTP protocol"

[dependencies]

Execute cargo run

Add CLAP

[dependencies]
clap = "*"

Add crate to main

#[macro_use]
extern crate clap;

fn main() {
    println!("Hello, world!");
}

Compile, watch progress

Add Args struct

fn main() {
    println!("Hello, world!");
}

#[derive(Debug)]
struct Args {
    dest_dir:       String,
    list_file:      String,
    threads_num:    usize,
    speed_limit:    usize,
}

Add parse_args

fn parse_args() -> Args {
    use clap::Arg;

    let args = app_from_crate!()
        .arg(Arg::with_name("dest_dir")
            .help("Directory where to store downloaded files")
            .short("o")
            .takes_value(true)
            .required(true)
        )
        .arg(Arg::with_name("list_file")
            .help("File which contains list of all URLs to download and local names for them")
            .short("f")
            .takes_value(true)
            .required(true)
        )
        .arg(Arg::with_name("threads_num")
            .help("Number of threads to use")
            .short("n")
            .default_value("1")
        )
        .arg(Arg::with_name("speed_limit")
            .help("Limit speed to N bytes per second; '0' means no limit
Suffixes supported:
    k, K - kilobytes (1024 bytes)
    m, M - megabytes (1024*1024 bytes)
")
            .short("l")
            .default_value("0")
        )
        .get_matches();

    Args {
        dest_dir:       args.value_of("dest_dir").unwrap().to_owned(),
        list_file:      args.value_of("list_file").unwrap().to_owned(),
        threads_num:    args.value_of("threads_num").unwrap().parse().unwrap(),
        speed_limit:    args.value_of("speed_limit").unwrap().parse().unwrap(),
    }
}

Integrate into main

fn main() {
    let args = parse_args();
    println!("Arguments: {:?}", args);
}

cargo run -- -o dest_dir

cargo run -- -o dest_dir -f files.lst

cargo run -- -o dest_dir -f files.llst -n 9

cargo run -- -o dest_dir -f files.llst -n 9 -l 15k

Intermezzo: Rust error haindling

Cargo.toml: add error-chain

[dependencies]
clap        = "*"
error-chain = "*"

Start with minimal common Error

#[macro_use]
extern crate clap;
#[macro_use]
extern crate error_chain;

mod errors {
    error_chain! {

    }
}

use errors::*;

Move to quick_main

quick_main!(run);

fn main()
fn run() -> Result<()> {
    let args = parse_args()?;
    println!("Arguments: {:?}", args);
    Ok(())
}

parse_args to Result

fn parse_args() -> Result<Args> {
    use clap::Arg;

And final return

    Ok(Args {
        dest_dir:       args.value_of("dest_dir").unwrap().to_owned(),
        list_file:      args.value_of("list_file").unwrap().to_owned(),
        threads_num:    args.value_of("threads_num").unwrap().parse().unwrap(),
        speed_limit:    args.value_of("speed_limit").unwrap().parse().unwrap(),
    })

cargo run -- -o dest_dir -f files.llst -n 9 -l 15k

Switch args to safe return

        )
        .get_matches_safe()?;

Add clap's error to foreign links

mod errors {
    error_chain! {
        foreign_links {
            Args(::clap::Error);
        }
    }
}

Switch parsers to ?

        threads_num:    args.value_of("threads_num").unwrap().parse()?,
        speed_limit:    args.value_of("speed_limit").unwrap().parse()?,

Add parse int error link

        foreign_links {
            Args(::clap::Error);
            ParseInt(::std::num::ParseIntError);
        }

cargo run -- -o dest_dir -f files.llst -n 9 -l 15k

parse_arg helper

    fn parse_arg<F, R>(args: &clap::ArgMatches, name: &str, parse: F) -> Result<R>
    where F: Fn(&str) -> Result<R>
    {
        parse(args.value_of(name).unwrap()).chain_err(|| format!("Invalid argument <{}>", name))
    }

Move Args return

    return Ok(Args {
        dest_dir:       parse_arg(&args, "dest_dir",    |s| Ok(s.to_owned()) )?,
        list_file:      parse_arg(&args, "list_file",   |s| Ok(s.to_owned()) )?,
        threads_num:    parse_arg(&args, "threads_num", |s| Ok(s.parse()?) )?,
        speed_limit:    parse_arg(&args, "speed_limit", |s| Ok(s.parse()?) )?,
    });

cargo run -- -o dest_dir -f files.llst -n 9 -l 15k

Add std::fs

extern crate error_chain;

use std::fs;

Add io::Error foreign link

        foreign_links {
            Args(::clap::Error);
            ParseInt(::std::num::ParseIntError);
            IO(::std::io::Error);
        }

Implement parse_dir

    fn parse_dir(path: &str) -> Result<String> {
        if !fs::metadata(path)?.is_dir() {
            bail!("{}: expected directory", path)
        }
        Ok(path.to_owned())
    }
        dest_dir:       parse_arg(&args, "dest_dir",    parse_dir )?,

Implement parse_file

    fn parse_file(path: &str) -> Result<String> {
        if !fs::metadata(path)?.is_file() {
            bail!("{}: expected file", path)
        }
        Ok(path.to_owned())
    }
        list_file:      parse_arg(&args, "list_file",   parse_file )?,

Implement parse_threads_num

    fn parse_threads_num(value: &str) -> Result<usize> {
        match value.parse()? {
            0 => bail!("cannot be zero"),
            n => Ok(n),
        }
    }
        threads_num:    parse_arg(&args, "threads_num", parse_threads_num )?,

Implement parse_speed_limit

    fn parse_speed_limit(value: &str) -> Result<usize> {
        match value.char_indices().last() {
            None => Ok(0),
            Some((last_i, last_ch)) => {
                let multiplier: usize = match last_ch {
                    'k' | 'K' => 1024,
                    'm' | 'M' => 1024*1024,
                    _ => 1,
                };
                let value = if multiplier == 1 { value } else { &value[..last_i] };
                Ok(value.parse::<usize>()? * multiplier)
            }
        }
    }
        speed_limit:    parse_arg(&args, "speed_limit", parse_speed_limit )?,

Read text from file

    let list_text = fs::File::open(&args.list_file)
        .and_then(|mut file| {
            let mut text = String::new();
            file.read_to_string(&mut text)?;
            Ok(text)
        })
        .chain_err(|| format!("Failed to read list file {}", &args.list_file))?;
use std::fs;
use std::io::Read;

Parse into list of URLs

    let urls = list_text
        .lines()
        .filter_map(|line| {
            let mut pieces = line.split(|c| " \r\n\t".contains(c)).filter(|s| !s.is_empty());
            let url = pieces.next();
            let fname = pieces.next();

            if let (Some(url), Some(fname)) = (url, fname) {
                Some((url, fname))
            }
            else { None }
        });
    
    for (url, fname) in urls {
        println!("{} => {}", url, fname);
    }

Add Hyper

[dependencies]
clap        = "*"
error-chain = "*"
hyper       = "0.10"
#[macro_use]
extern crate clap;
#[macro_use]
extern crate error_chain;
extern crate hyper;
        foreign_links {
            Args(::clap::Error);
            ParseInt(::std::num::ParseIntError);
            IO(::std::io::Error);
            Http(::hyper::Error);
        }

Minimal file download function

use std::fs;
use std::io::{self, Read};
use st::path::Path;
fn download_file(url: &str, fname: &str, dir: &str) -> Result<()> {
    let mut response = hyper::Client::new().get(url).send()?;
    if !response.status.is_success() {
        bail!("HTTP request failed: {}", response.status);
    }
    let mut file = fs::File::create(Path::new(dir).join(fname))?;
    let _ = io::copy(&mut response, &mut file);
    Ok(())
}
    for (url, fname) in urls {
        println!("Downloading {} => {}", url, fname);
        if let Err(error) = download_file(url, fname, &args.dest_dir) {
            eprintln!("Failed to download {} => {}!\nError: {}", url, fname, error);
            for err in error.iter().skip(1) {
                eprintln!("Caused by: {}", err);
            }
        }
    }

python -m http.server 8000 --bind 127.0.0.1

cargo run -- -o ../dl-here -f ../files.lst

Add crossbeam for multithreading

[dependencies]
clap        = "*"
error-chain = "*"
hyper       = "0.10"
crossbeam   = "*"
extern crate hyper;
extern crate crossbeam;

Move loop to download_files

fn download_files<'a, F>(next: F, dir: &str)
where F: Fn() -> Option<(&'a str, &'a str)>
{
    while let Some((url, fname)) = next() {
        println!("Downloading {} => {}", url, fname);
        if let Err(error) = download_file(url, fname, dir) {
            eprintln!("Failed to download {} => {}!\nError: {}", url, fname, error);
            for err in error.iter().skip(1) {
                eprintln!("Caused by: {}", err);
            }
        }
    }
}

Multithreading, with mutexes

use std::fs;
use std::io::{self, Read};
use std::path::Path;
use std::sync::Mutex;
    let urls = Mutex::new(urls);
    let next_url = move || urls.lock().unwrap().next();

    crossbeam::scope(|scope| {
        for _ in 1..args.threads_num {
            let next_url = &next_url;
            let dest_dir = &args.dest_dir;
            scope.spawn(move || download_files(&next_url, dest_dir));
        }
        download_files(&next_url, &args.dest_dir);
    });

cargo run -- -o ../dl-here -f ../files.lst -n 3

Add thread number to messages

fn download_files<'a, F>(tid: usize, next: F, dir: &str)
where F: Fn() -> Option<(&'a str, &'a str)>
{
    while let Some((url, fname)) = next() {
        println!("#{} Downloading {} => {}", tid, url, fname);
        if let Err(error) = download_file(url, fname, dir) {
            eprintln!("!{} Failed to download {} => {}!\n!{0} Error: {}", tid, url, fname, error);
            for err in error.iter().skip(1) {
                eprintln!("!{} Caused by: {}", tid, err);
            }
        }
    }
}
    crossbeam::scope(|scope| {
        for n in 1..args.threads_num {
            let next_url = &next_url;
            let dest_dir = &args.dest_dir;
            scope.spawn(move || download_files(n, &next_url, dest_dir));
        }
        download_files(0, &next_url, &args.dest_dir);
    });

Add one-thread mode

use std::sync::Mutex;
use std::cell::RefCell;
    if args.threads_num > 1 {
        let urls = Mutex::new(urls);
    ...
        });
    }
    else {
        let urls = RefCell::new(urls);
        let next_url = move || urls.borrow_mut().next();
        download_files(0, next_url, &args.dest_dir);
    }

cargo run -- -o ../dl-here -f ../files.lst

Add TokenBucket

use std::cell::RefCell;
use std::time::Instant;
#[derive(Debug)]
struct TokenBucket {
    rate:       usize,
    capacity:   usize,
    remaining:  f64,
    timestamp:  Instant,
}

impl TokenBucket {
    fn new(rate: usize) -> TokenBucket {
        TokenBucket::with_capacity(rate, rate)
    }

    fn with_capacity(rate: usize, capacity: usize) -> TokenBucket {
        TokenBucket {
            rate,
            capacity,
            remaining: 0f64,
            timestamp: Instant::now(),
        }
    }
}

Add take method

    fn take(&mut self, amount: usize) -> usize {
        if self.rate == 0 {
            return amount;
        }

        let delta = {
            let now = Instant::now();
            now - std::mem::replace(&mut self.timestamp, now)
        };

        let delta_fill = ((delta.as_secs() as f64) + (delta.subsec_nanos() as f64) / 1_000_000_000f64) * (self.rate as f64);
        self.remaining = (self.remaining + delta_fill).min(self.capacity as f64);

        let taken = std::cmp::min(self.remaining.floor() as usize, amount);
        self.remaining = (self.remaining - (taken as f64)).max(0f64);

        taken
    }

Implement copy_limited

use std::io::{self, Read, Write};
fn copy_limited<R, W, F>(reader: &mut R, writer: &mut W, limit: F) -> io::Result<u64>
where
    R: Read + ?Sized,
    W: Write + ?Sized,
    F: Fn(usize) -> usize
{
    let mut buf = [0; 64 * 1024];
    let mut written = 0;
    loop {
        let limit = limit(buf.len());
        if limit == 0 {
            std::thread::yield_now();
            continue;
        }
        let mut part = &mut buf[..limit];
        let len = match reader.read(&mut part) {
            Ok(0) => return Ok(written),
            Ok(len) => len,
            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
            Err(e) => return Err(e),
        };
        writer.write_all(&mut part[..len])?;
        written += len as u64;
    }
}

Migrate download_file

fn download_file<L>(url: &str, fname: &str, dir: &str, limit: L) -> Result<()>
where L: Fn(usize) -> usize
{
    let mut response = hyper::Client::new().get(url).send()?;
    if !response.status.is_success() {
        bail!("HTTP request failed: {}", response.status);
    }
    let mut file = fs::File::create(Path::new(dir).join(fname))?;
    let _ = copy_limited(&mut response, &mut file, limit);
    Ok(())
}

Migrate download_files

fn download_files<'a, F, L>(tid: usize, next: F, dir: &str, limit: L)
where
    F: Fn() -> Option<(&'a str, &'a str)>,
    L: Fn(usize) -> usize
{
    while let Some((url, fname)) = next() {
        println!("#{} Downloading {} => {}", tid, url, fname);
        if let Err(error) = download_file(url, fname, dir, &limit) {
            eprintln!("!{} Failed to download {} => {}!\n!{0} Error: {}", tid, url, fname, error);
            for err in error.iter().skip(1) {
                eprintln!("!{} Caused by: {}", tid, err);
            }
        }
    }
}

Migrate one-thread mode

        let bucket = RefCell::new(TokenBucket::new(args.speed_limit));
        let limit = move |amount| bucket.borrow_mut().take(amount);
        download_files(0, next_url, &args.dest_dir, limit);

Migrate multithread mode

        let bucket = Mutex::new(TokenBucket::new(args.speed_limit));
        let limit = move |amount| bucket.lock().unwrap().take(amount);

        crossbeam::scope(|scope| {
            for n in 1..args.threads_num {
                let next_url = &next_url;
                let limit = &limit;
                let dest_dir = &args.dest_dir;
                scope.spawn(move || download_files(n, next_url, dest_dir, limit));
            }
            download_files(0, &next_url, &args.dest_dir, &limit);
        });

Add total time counter

    let timer = Instant::now();

    if args.threads_num > 1 {
    ...
    }

    let timer = Instant::now() - timer;
    println!("Took {}.{} seconds", timer.as_secs(), timer.subsec_nanos());

cargo run -- -o ../dl-here -f ../files.lst -n 3 -l 15k

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment