Skip to content

Instantly share code, notes, and snippets.

@jsvisa
Created October 19, 2017 01:11
Show Gist options
  • Save jsvisa/01b570051373857c2961ba6e493f1372 to your computer and use it in GitHub Desktop.
Save jsvisa/01b570051373857c2961ba6e493f1372 to your computer and use it in GitHub Desktop.
picfetcher.go
package main
import (
"bufio"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"path"
"runtime"
"strconv"
"strings"
"syscall"
"time"
)
var (
keyfile = flag.String("f", "key.file", "source key file")
debug = flag.Bool("d", false, "Debug print or not")
workers = flag.Int("w", 100, "the number of workers")
todos chan job
)
var (
interval = 10
lastLine = 0
currentLine = 0
total = 0
count = 0
)
type job struct {
retry int
base string
id string
key string
escapedKey string
}
func escapeURI(key string) string {
// srcs := strings.Split(key, "/")
// dsts := make([]string, len(srcs))
// for i, e := range srcs {
// dsts[i] = url.QueryEscape(e)
// }
// return strings.Join(dsts, "/")
return key
}
func newHTTP() *http.Client {
return &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 900 * time.Second,
},
Timeout: 1200 * time.Second,
}
}
func run(client *http.Client, job job) {
if job.retry > 3 {
return
}
job.escapedKey = escapeURI(job.key)
req, err := http.NewRequest("GET", job.escapedKey, nil)
if err != nil {
panic(err)
}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("\033[0;31;40mGET %s Error- %v\033[0m\n", job.key, err)
job.retry++
todos <- job
return
}
fmt.Printf("GET %s '%d'\n", job.escapedKey, resp.StatusCode)
if code := resp.StatusCode; code/100 != 2 {
fmt.Printf("\033[0;31;40mGET %s Error- StatusCode: %d \033[0m\n", job.key, code)
if code != 404 && job.retry <= 5 {
job.retry++
todos <- job
}
return
}
defer resp.Body.Close()
f := fmt.Sprintf("%s.jpg", job.id)
fileName := path.Join(job.base, f)
output, err := os.Create(fileName)
if err != nil {
fmt.Println("Error while creating", fileName, "-", err)
job.retry++
todos <- job
return
}
defer output.Close()
_, err = io.Copy(output, resp.Body)
if err != nil {
fmt.Println("Error while downloading", job.key, "-", err)
job.retry++
todos <- job
return
}
// couldn't fetch Content-Length for some site at sometime
// fetchLen, err := strconv.Atoi(resp.Header.Get("Content-Length"))
//
// if writeLen != int64(fetchLen) {
// fmt.Println("Error mismatch Content-Length ", job.key, "-", writeLen, " vs ", fetchLen)
// todos <- job
// }
}
func scanning(keyfile string, sig chan<- os.Signal) {
// open a file
base := path.Base(keyfile)
base = strings.Replace(base, ".txt", ".dir", 1)
os.Mkdir(base, os.ModeDir)
if file, err := os.Open(keyfile); err == nil {
// make sure it gets closed
defer file.Close()
// create a new scanner and read the file line by line
scanner := bufio.NewScanner(file)
for scanner.Scan() {
currentLine++
// if *debug {
// fmt.Printf("Scan: %s\n", scanner.Text())
// }
if currentLine >= lastLine {
parts := strings.Split(scanner.Text(), " ")
id := parts[0]
key := parts[1]
if *debug {
fmt.Printf("Scan: %s\n", key)
}
if strings.HasPrefix(key, "http://") || strings.HasPrefix(key, "https://") {
todos <- job{id: id, key: key, base: base}
}
}
}
sig <- syscall.SIGSTOP
// check for errors
if err = scanner.Err(); err != nil {
log.Fatal(err)
}
} else {
log.Fatal(err)
}
}
func status() {
start := time.Now()
for {
time.Sleep(10 * time.Second)
fmt.Printf("%v TODO list length is %d, cap is %d, NumGORoutes %d, elapsed with: %v\n",
time.Now(), len(todos), cap(todos), runtime.NumGoroutine(), time.Since(start))
}
}
func keeper(file string) {
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
log.Fatalf("open %s error: %v\n", file, err)
}
defer f.Close()
for {
time.Sleep(10 * time.Second)
if _, err := f.WriteString(fmt.Sprintf("%d", currentLine)); err == nil {
f.Seek(0, 0)
} else {
panic(err)
}
}
}
func init() {
flag.Parse()
todos = make(chan job, *workers)
}
func main() {
runtime.GOMAXPROCS(2)
keyProcessFile := fmt.Sprintf("%s.process", *keyfile)
if buf, err := ioutil.ReadFile(keyProcessFile); err == nil {
if num, err := strconv.Atoi(string(buf)); err == nil {
lastLine = num
}
}
fmt.Printf("**** starting from line: %d\n", lastLine)
go status()
go keeper(keyProcessFile)
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGSTOP, syscall.SIGUSR1, os.Interrupt, os.Kill)
todos <- job{}
go scanning(*keyfile, sig)
client := newHTTP()
for {
select {
case signal := <-sig:
switch signal {
case syscall.SIGSTOP:
go func() {
for {
if len(todos) <= 0 {
time.Sleep(time.Duration(10) * time.Second)
sig <- syscall.SIGUSR1
} else {
time.Sleep(1 * time.Second)
}
}
}()
default:
fmt.Println("stopping...... got total length: ", total, "count: ", count)
os.Exit(0)
}
case job := <-todos:
// fmt.Println("get a new job", job.key)
if job.key != "" {
go run(client, job)
}
time.Sleep(time.Duration(interval) * time.Millisecond)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment