Skip to content

Instantly share code, notes, and snippets.

@pasdoy
Last active December 30, 2022 16:18
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save pasdoy/66421b4147c7dd15f4d15d7ab0c6a31a to your computer and use it in GitHub Desktop.
Save pasdoy/66421b4147c7dd15f4d15d7ab0c6a31a to your computer and use it in GitHub Desktop.
package main
import (
"encoding/json"
"github.com/goware/urlx"
"github.com/satori/go.uuid"
"github.com/streadway/amqp"
"go.uber.org/zap"
"golang.org/x/net/html"
"gopkg.in/alecthomas/kingpin.v2"
"io"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
var (
MQURL = kingpin.Flag("mq-url", "Message queue URL").Default("amqp://guest:guest@localhost:5672/").OverrideDefaultFromEnvar("MQ_URL").String()
NbWorker = kingpin.Flag("workers", "Number of workers").Default("20").OverrideDefaultFromEnvar("WORKERS").Int()
MQMsgCount = kingpin.Flag("mq-msg-count", "Message prefetch count").Default("100").OverrideDefaultFromEnvar("MQ_MSG_COUNT").Int()
Timeout = kingpin.Flag("timeout", "Timeout connection second").Default("5").OverrideDefaultFromEnvar("TIMEOUT").Int()
)
var client *http.Client
var mq *amqp.Connection
var queue chan string
var log *zap.Logger
func NewLogger() *zap.Logger {
//logger, _ := zap.NewProduction()
cfg := zap.NewProductionConfig()
cfg.OutputPaths = []string{"info.log"}
cfg.DisableCaller = true
cfg.EncoderConfig.TimeKey = ""
cfg.EncoderConfig.LevelKey = ""
logger, _ := cfg.Build()
return logger
}
// Helper function to pull the href attribute from a Token
func getHref(t html.Token) (ok bool, href, rel string) {
for _, a := range t.Attr {
if a.Key == "href" {
href = strings.TrimSpace(a.Val)
if strings.HasPrefix(href, "mailto:") ||
strings.HasPrefix(href, "javascript:") ||
strings.HasPrefix(href, "fax:") ||
strings.HasPrefix(href, "modem:") {
return
}
ok = true
}
if a.Key == "rel" {
rel = a.Val
}
}
return
}
func cleanURL(s string, responseURL *url.URL) (string, error) {
uPrsed, err := url.Parse(s)
if err != nil {
return "", err
}
if uPrsed.Host == "" {
uPrsed.Host = responseURL.Hostname()
uPrsed.Scheme = responseURL.Scheme
}
urlFmt, err := urlx.Normalize(uPrsed)
if err != nil {
return "", err
}
urlFmt = strings.Split(urlFmt, "#")[0]
return urlFmt, nil
}
type CrawledURL struct {
OnPage string
OriginalLink string
CleanedLink string
LinkRel string
Anchor string
DiscoveredAT time.Time
}
func (c *CrawledURL) Unmarshal() string {
o, _ := json.Marshal(c)
return string(o)
}
func (c *CrawledURL) PrintCSV() {
//log.Info("CrawledURL|\t|%s|\t|%s|\t|%s", c.OnPage, c.OriginalLink, c.CleanedLink)
log.Info("CrawledURL",
zap.String("OnPage", c.OnPage),
zap.String("OriginalLink", c.OriginalLink),
zap.String("CleanedLink", c.CleanedLink),
zap.String("LinkRel", c.LinkRel),
zap.String("Anchor", c.Anchor),
)
}
type CrawledURLs []CrawledURL
func parseLinks(b io.Reader, responseURL *url.URL, workerID string) (urls CrawledURLs, totalLinks int) {
z := html.NewTokenizer(b)
duplicateSet := map[string]bool{}
for {
tt := z.Next()
switch {
case tt == html.ErrorToken:
// End of the document, we're done
return
case tt == html.StartTagToken:
t := z.Token()
// Check if the token is an <a> tag
if t.Data != "a" {
continue
}
// Extract the href value, if there is one
ok, u, rel := getHref(t)
if !ok {
continue
}
cleanedURL, err := cleanURL(u, responseURL)
if err != nil {
continue
}
totalLinks += 1
//handle same url on page
_, seen := duplicateSet[cleanedURL]
if seen {
continue
}
duplicateSet[cleanedURL] = true
//get anchor
z.Next()
text := z.Token()
cURL := CrawledURL{
OnPage: responseURL.String(),
OriginalLink: u,
CleanedLink: cleanedURL,
LinkRel: rel,
Anchor: strings.TrimSpace(text.Data),
DiscoveredAT: time.Now(),
}
cURL.PrintCSV()
urls = append(urls, cURL)
}
}
return
}
type SeenURL struct {
OriginalURL string
ResultURL string
CrawledAt time.Time
NbUniqueLinks int
NbValidLinks int
ReqDuration time.Duration
HTTPStatusCode int
}
func (s *SeenURL) Unmarshal() string {
o, _ := json.Marshal(s)
return string(o)
}
func (s *SeenURL) PrintCSV() {
//str := "SeenURL|\t|" + s.OriginalURL + "|\t|" + s.ResultURL + "|\t|" + strconv.Itoa(s.HTTPStatusCode)
log.Info("SeenURL",
zap.String("OriginalURL", s.OriginalURL),
zap.String("ResultURL", s.ResultURL),
zap.Int("HTTPStatusCode", s.HTTPStatusCode),
)
}
func parse(entryURL, workerID string) error {
beforeReq := time.Now()
req, _ := http.NewRequest("GET", entryURL, nil)
req.Header.Set("User-Agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)")
resp, err := client.Do(req)
if err != nil {
//log.Error(err)
seenURL := SeenURL{
OriginalURL: entryURL,
ResultURL: entryURL,
CrawledAt: time.Now(),
}
seenURL.PrintCSV()
return nil
}
durationReq := time.Since(beforeReq)
b := resp.Body
defer b.Close()
urls, totalLinks := CrawledURLs{}, 0
resultURL := resp.Request.URL.String()
if resp.StatusCode/100 == 3 {
//redirect
resultURL, _ = cleanURL(resp.Header.Get("Location"), resp.Request.URL)
} else {
urls, totalLinks = parseLinks(b, resp.Request.URL, workerID)
}
//inser seen link
seenURL := SeenURL{
OriginalURL: entryURL,
ResultURL: resultURL,
CrawledAt: time.Now(),
NbUniqueLinks: len(urls),
NbValidLinks: totalLinks,
ReqDuration: durationReq,
HTTPStatusCode: resp.StatusCode,
}
seenURL.PrintCSV()
return nil
}
func main() {
log = NewLogger()
kingpin.Parse()
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
log.Sync()
os.Exit(1)
}()
queue = make(chan string, 1000)
tr := &http.Transport{
MaxIdleConns: 20,
IdleConnTimeout: 30 * time.Second,
DisableCompression: false,
}
client = &http.Client{
Transport: tr,
Timeout: time.Duration(time.Duration(*Timeout) * time.Second),
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
db.LogMode(true)
parse("https://www.pinterest.com/explore/mystery-books/", "1")
return
conn, err := amqp.Dial(*MQURL)
failOnError(err)
mq = conn
for i := 0; i < *NbWorker; i++ {
go worker()
}
go fetcher()
//lambda.Start(HandleRequest)
forever := make(chan bool)
<-forever
}
func worker() {
workerID := uuid.NewV4().String()
for link := range queue {
parse(link, workerID)
}
}
func fetcher() {
ch, err := mq.Channel()
failOnError(err)
err = ch.Qos(
*MQMsgCount, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err)
msgs, err := ch.Consume(
"tocrawl", // queue
uuid.NewV4().String(), // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err)
for {
nbMessages := 0
for d := range msgs {
queue <- string(d.Body)
nbMessages += 1
if nbMessages >= *MQMsgCount {
d.Ack(true)
break
}
}
//wait for queue to empty
for len(queue) > 10 {
time.Sleep(50 * time.Millisecond)
}
}
}
func failOnError(err error) {
if err != nil {
log.Fatal(err.Error())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment