-
-
Save pasdoy/66421b4147c7dd15f4d15d7ab0c6a31a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"github.com/goware/urlx" | |
"github.com/satori/go.uuid" | |
"github.com/streadway/amqp" | |
"go.uber.org/zap" | |
"golang.org/x/net/html" | |
"gopkg.in/alecthomas/kingpin.v2" | |
"io" | |
"net/http" | |
"net/url" | |
"os" | |
"os/signal" | |
"strings" | |
"syscall" | |
"time" | |
) | |
var ( | |
MQURL = kingpin.Flag("mq-url", "Message queue URL").Default("amqp://guest:guest@localhost:5672/").OverrideDefaultFromEnvar("MQ_URL").String() | |
NbWorker = kingpin.Flag("workers", "Number of workers").Default("20").OverrideDefaultFromEnvar("WORKERS").Int() | |
MQMsgCount = kingpin.Flag("mq-msg-count", "Message prefetch count").Default("100").OverrideDefaultFromEnvar("MQ_MSG_COUNT").Int() | |
Timeout = kingpin.Flag("timeout", "Timeout connection second").Default("5").OverrideDefaultFromEnvar("TIMEOUT").Int() | |
) | |
var client *http.Client | |
var mq *amqp.Connection | |
var queue chan string | |
var log *zap.Logger | |
func NewLogger() *zap.Logger { | |
//logger, _ := zap.NewProduction() | |
cfg := zap.NewProductionConfig() | |
cfg.OutputPaths = []string{"info.log"} | |
cfg.DisableCaller = true | |
cfg.EncoderConfig.TimeKey = "" | |
cfg.EncoderConfig.LevelKey = "" | |
logger, _ := cfg.Build() | |
return logger | |
} | |
// Helper function to pull the href attribute from a Token | |
func getHref(t html.Token) (ok bool, href, rel string) { | |
for _, a := range t.Attr { | |
if a.Key == "href" { | |
href = strings.TrimSpace(a.Val) | |
if strings.HasPrefix(href, "mailto:") || | |
strings.HasPrefix(href, "javascript:") || | |
strings.HasPrefix(href, "fax:") || | |
strings.HasPrefix(href, "modem:") { | |
return | |
} | |
ok = true | |
} | |
if a.Key == "rel" { | |
rel = a.Val | |
} | |
} | |
return | |
} | |
func cleanURL(s string, responseURL *url.URL) (string, error) { | |
uPrsed, err := url.Parse(s) | |
if err != nil { | |
return "", err | |
} | |
if uPrsed.Host == "" { | |
uPrsed.Host = responseURL.Hostname() | |
uPrsed.Scheme = responseURL.Scheme | |
} | |
urlFmt, err := urlx.Normalize(uPrsed) | |
if err != nil { | |
return "", err | |
} | |
urlFmt = strings.Split(urlFmt, "#")[0] | |
return urlFmt, nil | |
} | |
type CrawledURL struct { | |
OnPage string | |
OriginalLink string | |
CleanedLink string | |
LinkRel string | |
Anchor string | |
DiscoveredAT time.Time | |
} | |
func (c *CrawledURL) Unmarshal() string { | |
o, _ := json.Marshal(c) | |
return string(o) | |
} | |
func (c *CrawledURL) PrintCSV() { | |
//log.Info("CrawledURL|\t|%s|\t|%s|\t|%s", c.OnPage, c.OriginalLink, c.CleanedLink) | |
log.Info("CrawledURL", | |
zap.String("OnPage", c.OnPage), | |
zap.String("OriginalLink", c.OriginalLink), | |
zap.String("CleanedLink", c.CleanedLink), | |
zap.String("LinkRel", c.LinkRel), | |
zap.String("Anchor", c.Anchor), | |
) | |
} | |
type CrawledURLs []CrawledURL | |
func parseLinks(b io.Reader, responseURL *url.URL, workerID string) (urls CrawledURLs, totalLinks int) { | |
z := html.NewTokenizer(b) | |
duplicateSet := map[string]bool{} | |
for { | |
tt := z.Next() | |
switch { | |
case tt == html.ErrorToken: | |
// End of the document, we're done | |
return | |
case tt == html.StartTagToken: | |
t := z.Token() | |
// Check if the token is an <a> tag | |
if t.Data != "a" { | |
continue | |
} | |
// Extract the href value, if there is one | |
ok, u, rel := getHref(t) | |
if !ok { | |
continue | |
} | |
cleanedURL, err := cleanURL(u, responseURL) | |
if err != nil { | |
continue | |
} | |
totalLinks += 1 | |
//handle same url on page | |
_, seen := duplicateSet[cleanedURL] | |
if seen { | |
continue | |
} | |
duplicateSet[cleanedURL] = true | |
//get anchor | |
z.Next() | |
text := z.Token() | |
cURL := CrawledURL{ | |
OnPage: responseURL.String(), | |
OriginalLink: u, | |
CleanedLink: cleanedURL, | |
LinkRel: rel, | |
Anchor: strings.TrimSpace(text.Data), | |
DiscoveredAT: time.Now(), | |
} | |
cURL.PrintCSV() | |
urls = append(urls, cURL) | |
} | |
} | |
return | |
} | |
type SeenURL struct { | |
OriginalURL string | |
ResultURL string | |
CrawledAt time.Time | |
NbUniqueLinks int | |
NbValidLinks int | |
ReqDuration time.Duration | |
HTTPStatusCode int | |
} | |
func (s *SeenURL) Unmarshal() string { | |
o, _ := json.Marshal(s) | |
return string(o) | |
} | |
func (s *SeenURL) PrintCSV() { | |
//str := "SeenURL|\t|" + s.OriginalURL + "|\t|" + s.ResultURL + "|\t|" + strconv.Itoa(s.HTTPStatusCode) | |
log.Info("SeenURL", | |
zap.String("OriginalURL", s.OriginalURL), | |
zap.String("ResultURL", s.ResultURL), | |
zap.Int("HTTPStatusCode", s.HTTPStatusCode), | |
) | |
} | |
func parse(entryURL, workerID string) error { | |
beforeReq := time.Now() | |
req, _ := http.NewRequest("GET", entryURL, nil) | |
req.Header.Set("User-Agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)") | |
resp, err := client.Do(req) | |
if err != nil { | |
//log.Error(err) | |
seenURL := SeenURL{ | |
OriginalURL: entryURL, | |
ResultURL: entryURL, | |
CrawledAt: time.Now(), | |
} | |
seenURL.PrintCSV() | |
return nil | |
} | |
durationReq := time.Since(beforeReq) | |
b := resp.Body | |
defer b.Close() | |
urls, totalLinks := CrawledURLs{}, 0 | |
resultURL := resp.Request.URL.String() | |
if resp.StatusCode/100 == 3 { | |
//redirect | |
resultURL, _ = cleanURL(resp.Header.Get("Location"), resp.Request.URL) | |
} else { | |
urls, totalLinks = parseLinks(b, resp.Request.URL, workerID) | |
} | |
//inser seen link | |
seenURL := SeenURL{ | |
OriginalURL: entryURL, | |
ResultURL: resultURL, | |
CrawledAt: time.Now(), | |
NbUniqueLinks: len(urls), | |
NbValidLinks: totalLinks, | |
ReqDuration: durationReq, | |
HTTPStatusCode: resp.StatusCode, | |
} | |
seenURL.PrintCSV() | |
return nil | |
} | |
func main() { | |
log = NewLogger() | |
kingpin.Parse() | |
c := make(chan os.Signal, 2) | |
signal.Notify(c, os.Interrupt, syscall.SIGTERM) | |
go func() { | |
<-c | |
log.Sync() | |
os.Exit(1) | |
}() | |
queue = make(chan string, 1000) | |
tr := &http.Transport{ | |
MaxIdleConns: 20, | |
IdleConnTimeout: 30 * time.Second, | |
DisableCompression: false, | |
} | |
client = &http.Client{ | |
Transport: tr, | |
Timeout: time.Duration(time.Duration(*Timeout) * time.Second), | |
CheckRedirect: func(req *http.Request, via []*http.Request) error { | |
return http.ErrUseLastResponse | |
}, | |
} | |
db.LogMode(true) | |
parse("https://www.pinterest.com/explore/mystery-books/", "1") | |
return | |
conn, err := amqp.Dial(*MQURL) | |
failOnError(err) | |
mq = conn | |
for i := 0; i < *NbWorker; i++ { | |
go worker() | |
} | |
go fetcher() | |
//lambda.Start(HandleRequest) | |
forever := make(chan bool) | |
<-forever | |
} | |
func worker() { | |
workerID := uuid.NewV4().String() | |
for link := range queue { | |
parse(link, workerID) | |
} | |
} | |
func fetcher() { | |
ch, err := mq.Channel() | |
failOnError(err) | |
err = ch.Qos( | |
*MQMsgCount, // prefetch count | |
0, // prefetch size | |
false, // global | |
) | |
failOnError(err) | |
msgs, err := ch.Consume( | |
"tocrawl", // queue | |
uuid.NewV4().String(), // consumer | |
false, // auto-ack | |
false, // exclusive | |
false, // no-local | |
false, // no-wait | |
nil, // args | |
) | |
failOnError(err) | |
for { | |
nbMessages := 0 | |
for d := range msgs { | |
queue <- string(d.Body) | |
nbMessages += 1 | |
if nbMessages >= *MQMsgCount { | |
d.Ack(true) | |
break | |
} | |
} | |
//wait for queue to empty | |
for len(queue) > 10 { | |
time.Sleep(50 * time.Millisecond) | |
} | |
} | |
} | |
func failOnError(err error) { | |
if err != nil { | |
log.Fatal(err.Error()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment