Created
July 5, 2013 13:14
-
-
Save meddulla/5934457 to your computer and use it in GitHub Desktop.
a simple scraper in go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
//TODO swagger dump json at /docs | |
import ( | |
"crypto/md5" | |
"encoding/json" | |
"fmt" | |
"ibtx/crawler/types" //TODO | |
"io" | |
"io/ioutil" | |
"labix.org/v2/mgo" | |
"labix.org/v2/mgo/bson" | |
"log" | |
"net/http" | |
"net/url" | |
"os" | |
"regexp" | |
"runtime" | |
"strconv" | |
"strings" | |
"time" | |
) | |
var workers = runtime.NumCPU() - 1 | |
var jobs = make(chan Job, workers) | |
var results = make(chan types.Result) | |
var done = make(chan bool, workers) | |
var conf = &types.Config{} | |
var metaOgTagsPattern = regexp.MustCompile(`<meta\s+property\s*=\s*[\"]og:[^>]+>`) | |
var metaPropertyPattern = regexp.MustCompile(`property\s*=\s*[\"]og:([^\"]+)`) | |
var metaContentPattern = regexp.MustCompile(`content\s*=\s*[\"]([^\"]+)`) | |
var metaModifiedTimePattern = regexp.MustCompile(`<meta\s+property\s*=\s*"\S*article(?:#|:)modified_time"\scontent="(\S*)"\s*\/>`) | |
var db *mgo.Collection | |
//the tags are to map to the lowercase keys | |
type Job struct { | |
Url string `json:"url"` | |
LastModified uint64 `json:"lastModified"` | |
pageId string `json:"pageId"` | |
results chan<- types.Result | |
} | |
type Jobs []Job | |
func (r Job) PageId() (s string) { | |
if r.pageId == "" { | |
h := md5.New() | |
io.WriteString(h, r.Url) | |
r.pageId = fmt.Sprintf("%x", h.Sum(nil)) | |
} | |
return r.pageId | |
} | |
func (job Job) Do() { | |
log.Println("Doing job " + job.Url) | |
r, err := http.Get(job.Url) //automatically follows redirects up to 10 | |
defer r.Body.Close() | |
result := types.Result{} | |
result.Url = job.Url | |
result.LastModified = job.LastModified | |
result.StatusCode = r.StatusCode | |
if r.StatusCode != http.StatusOK || err != nil { | |
log.Printf("%d code for url: %s ", r.StatusCode, result.Url) | |
result.Err = r.Status | |
job.results <- result | |
return | |
} | |
contents, err := ioutil.ReadAll(r.Body) | |
if err != nil { | |
log.Println(err) | |
result.Err = "Failed getting body" | |
job.results <- result | |
return | |
} | |
log.Printf("%d code for url: %s ", r.StatusCode, result.Url) | |
result.Html = string(contents) | |
metaTags := extractTags(&result) | |
result.Html = "" //clear this big string from memory | |
if len(metaTags) == 0 { | |
job.results <- result | |
return | |
} | |
if strModTime, ok := metaTags["meta.modified_time"]; ok && job.LastModified > 0 { | |
modTime, _ := strconv.ParseUint(strModTime, 10, 64) | |
if modTime > job.LastModified { | |
go saveTags(metaTags, result, job) | |
return | |
} | |
log.Printf("modified_time: %d , lastModified: %d ", modTime, job.LastModified) | |
} | |
//Always save if lastModified was not sent | |
if job.LastModified == 0 { | |
go saveTags(metaTags, result, job) | |
return | |
} | |
log.Println("Not saving tags of " + result.Url) | |
job.results <- result | |
return | |
} | |
func extractTags(result *types.Result) (metaTags map[string]string) { | |
//get tags | |
metaTags = map[string]string{} | |
matchOgTags := metaOgTagsPattern.FindAllString(result.Html, -1) | |
if len(matchOgTags) == 0 { | |
result.Err = "No tags defined" | |
return metaTags | |
} | |
fmt.Println("got match for " + result.Url) | |
//capture | |
for _, m := range matchOgTags { | |
prop := metaPropertyPattern.FindString(m) | |
prop = strings.TrimSpace(strings.Replace(prop, `property="og:`, "", -1)) | |
content := metaContentPattern.FindString(m) | |
content = strings.TrimSpace(strings.Replace(content, `content="`, "", -1)) | |
if prop != "" && content != "" { | |
metaTags["meta."+prop] = content | |
} | |
} | |
if len(metaTags) == 0 { | |
result.Err = "No tags defined" | |
return metaTags | |
} | |
_, hasHostname := metaTags["meta.hostname"] | |
if hasHostname == false { | |
u, err := url.Parse(result.Url) | |
if err == nil { | |
metaTags["meta.hostname"] = u.Host | |
} | |
} | |
matchModifiedTime := metaModifiedTimePattern.FindStringSubmatch(result.Html) | |
if len(matchModifiedTime) == 2 { | |
metaTags["meta.modified_time"] = matchModifiedTime[1] | |
} | |
//Unix is the nr of seconds (not milliseconds) | |
metaTags["meta.updatedAt"] = fmt.Sprintf("%d", time.Now().Unix()*1000) | |
fmt.Println(metaTags) | |
return metaTags | |
} | |
func saveTags(metaTags map[string]string, result types.Result, job Job) { | |
log.Println("Saving tags of " + result.Url) | |
pageId := job.PageId() | |
colQuerier := bson.M{"pageId": pageId} | |
change := bson.M{"$set": metaTags} | |
err := db.Update(colQuerier, change) | |
if err != nil { | |
log.Fatalln(err) | |
result.Err = "Failed to update db" | |
} | |
job.results <- result | |
return | |
} | |
func addJob(job Job) { | |
log.Println("adding job " + job.Url) | |
job.results = results | |
job.Do() | |
done <- true | |
} | |
func waitAndProcessResults(done <-chan bool, results <-chan types.Result) { | |
counter := 0 | |
for { | |
select { | |
case result := <-results: | |
if result.Err != "" { | |
log.Printf("Result: Error: %s \n", result.Err) | |
} else if result.Html != "" { | |
log.Printf("Result: OK for %s\n", result.Url) | |
} else { | |
log.Println("Result: Not an error but no html") | |
} | |
case <-done: | |
counter++ | |
fmt.Printf("%d jobs done \n", counter) | |
} | |
} | |
} | |
func addJobsHandler(rw http.ResponseWriter, req *http.Request) { | |
var jobs Jobs | |
decoder := json.NewDecoder(req.Body) | |
resp := types.Response{"status": 200, "msg": "Added jobs to queue"} | |
err := decoder.Decode(&jobs) | |
if err != nil { | |
log.Println(err) | |
resp = types.Response{"status": 400, "msg": "Failed validation. Please send tasks as a list of url and lastModified timestamp"} | |
} else { | |
for _, job := range jobs { | |
go addJob(job) | |
} | |
} | |
rw.Header().Set("Content-Type", "application/json") | |
fmt.Fprint(rw, resp) | |
} | |
func init() { | |
//sets the maximum number of CPUs that can be executing simultaneously | |
runtime.GOMAXPROCS(workers) | |
file, err := os.Open("Cfg/conf.json") | |
if err != nil { | |
panic(err) | |
} | |
decoder := json.NewDecoder(file) | |
decoder.Decode(&conf) | |
defer func() { | |
file.Close() | |
}() | |
//Connect to mongo | |
session, err := mgo.Dial(conf.Db.Host) | |
if err != nil { | |
panic(err) | |
} | |
db = session.DB(conf.Db.DbName).C(conf.Db.Collection) | |
} | |
func main() { | |
//job := Job{Url: "http://google.com", LastModified: 1372693030271} | |
//go addJob(job) | |
go waitAndProcessResults(done, results) | |
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { | |
fmt.Fprintf(w, "Nothing to see here.") | |
}) | |
http.HandleFunc("/jobs/add", addJobsHandler) | |
log.Println("Serving at port " + conf.Port) | |
log.Fatal(http.ListenAndServe(":"+conf.Port, nil)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment