Skip to content

Instantly share code, notes, and snippets.

@wthorp
Created July 6, 2022 03:53
Show Gist options
  • Save wthorp/efbf8026d1455ba78d5e6abe4cef742f to your computer and use it in GitHub Desktop.
Save wthorp/efbf8026d1455ba78d5e6abe4cef742f to your computer and use it in GitHub Desktop.
parallel downloader suitable for piping
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
)
var url string
var concurrency int
var rangeSize int64
type result struct {
id int
data []byte
status int
}
func init() {
flag.IntVar(&concurrency, "concurrency", 4, "number of ranges to download at once")
flag.Int64Var(&rangeSize, "range-size", 64*1000*1000, "bytes to download per range")
flag.Parse()
url = flag.Arg(0)
if url == "" {
programName, _ := os.Executable()
fmt.Printf("Usage: %s [OPTIONS]\n", filepath.Base(programName))
flag.PrintDefaults()
os.Exit(1)
}
}
func main() {
jobs := make(chan int, concurrency)
results := make(chan result, concurrency)
for w := 0; w < concurrency; w++ {
go worker(w, jobs, results)
}
lastRequest := 0
for ; lastRequest < concurrency; lastRequest++ {
jobs <- lastRequest
}
resultsQueue := make(map[int]result, concurrency)
lastResult := -1
stoppedJobs := 0
for {
r := <-results
resultsQueue[r.id] = r
if (r.status == http.StatusOK || r.status == http.StatusPartialContent) && stoppedJobs == 0 {
jobs <- lastRequest
lastRequest++
} else {
stoppedJobs++
}
localLastResult := lastResult
for w := 1; w <= concurrency; w++ {
wResult := resultsQueue[localLastResult+w]
if wResult.status == http.StatusOK || wResult.status == http.StatusPartialContent {
fmt.Fprintf(os.Stderr, "writing %d bytes from job %d index %d\n", len(wResult.data), wResult.id, localLastResult+w)
os.Stdout.Write(wResult.data)
lastResult++
delete(resultsQueue, wResult.id)
} else {
break
}
}
if stoppedJobs == concurrency {
break
}
}
}
func worker(id int, jobs <-chan int, results chan<- result) {
for j := range jobs {
status, data, err := downloadRange(int64(j))
if err != nil {
fmt.Fprintf(os.Stderr, "ah, this is just an error: %+v\n", err)
}
fmt.Fprintf(os.Stderr, "worker %d finished job %d with %d bytes\n", id, j, len(data))
results <- result{id: j, data: data, status: status}
}
}
func downloadRange(rangeNum int64) (int, []byte, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return 500, []byte{}, err
}
startRange := rangeNum * rangeSize
endRange := (rangeNum+1)*rangeSize - 1
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", startRange, endRange))
resp, err := http.DefaultClient.Do(req)
if err != nil {
return 500, []byte{}, err
}
fmt.Fprintf(os.Stderr, "job %d [%d-%d] status %d\n", rangeNum, startRange, endRange, resp.StatusCode)
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
return resp.StatusCode, data, err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment