Skip to content

Instantly share code, notes, and snippets.

@malisetti
Last active September 28, 2018 10:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save malisetti/cd7f93219035588dfe042234a003fae5 to your computer and use it in GitHub Desktop.
Save malisetti/cd7f93219035588dfe042234a003fae5 to your computer and use it in GitHub Desktop.
pass it a file of rss feeds
package main
import (
"bufio"
"database/sql"
"fmt"
"log"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
medium "github.com/Medium/medium-sdk-go"
"github.com/badoux/goscraper"
"github.com/gregjones/httpcache"
"github.com/gregjones/httpcache/diskcache"
_ "github.com/mattn/go-sqlite3"
"github.com/mmcdole/gofeed"
)
const mediumUserID string = ""
const mediumIntegrationToken string = ""
const googleSearchRSS string = "https://news.google.com/news/rss/search/section/q/%s?ned=in&gl=IN&hl=en-IN"
type docPreview struct {
ID int64
goscraper.DocumentPreview
Content string
Published int64
}
type row struct {
Link, Title, Description string
ID int64
Published int64
}
type rows struct {
lastRowID int64
sync.Mutex
}
var rowsBuffer rows
var postTags = []string{"India", "Crime", "Justice", "Humanity", "Society"}
var routines = make(chan int)
func main() {
defer func() {
log.Printf("last visited id %d\n", rowsBuffer.lastRowID)
}()
defer close(routines)
go func() {
total := 0
for c := range routines {
total += c
log.Printf("num routines %d\n", total)
}
}()
routines <- 1
if len(os.Args) < 3 {
log.Fatalln("pass the file and id of the last posted id")
}
seeds, err := os.Open(os.Args[1])
if err != nil {
log.Fatal(err)
}
defer seeds.Close()
lastID, err := strconv.Atoi(os.Args[2])
if err != nil {
log.Fatal(err)
}
createTableStmt := "CREATE TABLE IF NOT EXISTS `links` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `url` TEXT UNIQUE, `title` TEXT, `description` TEXT, `content` TEXT, `published` INTEGER )"
insertStmt := `INSERT INTO links(url, title, description, content, published) values(?,?,?,?,?)`
existsStmt := `SELECT EXISTS(SELECT 1 FROM links WHERE url=? LIMIT 1)`
db, err := sql.Open("sqlite3", "./db.sqlite")
if err != nil {
log.Fatalln(err)
}
defer db.Close()
// db.SetMaxOpenConns(1)
// db.Exec("PRAGMA journal_mode=WAL;")
createStmt, err := db.Prepare(createTableStmt)
if err == nil {
createStmt.Exec()
createStmt.Close()
} else {
log.Println(err)
}
insertPStmt, err := db.Prepare(insertStmt)
if err != nil {
log.Fatalln(err)
}
defer insertPStmt.Close()
existsPStmt, err := db.Prepare(existsStmt)
if err != nil {
log.Fatal(err)
}
defer existsPStmt.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
routines <- 1
defer func() {
routines <- -1
}()
defer wg.Done()
for {
log.Println(time.Now())
lines, err := rewindAndRead(seeds)
if err != nil {
log.Println(err)
continue
}
feed := toFeed(lines)
docs := insertToDB(feed, existsPStmt, insertPStmt)
func() {
rowsBuffer.Lock()
defer rowsBuffer.Unlock()
d1 := <-docs
minLid := d1.ID
for d := range docs {
if minLid > d.ID {
minLid = d.ID
}
}
rowsBuffer.lastRowID = minLid
}()
time.Sleep(1 * time.Hour)
}
}()
wg.Add(1)
go func() {
routines <- 1
defer func() {
routines <- -1
}()
defer wg.Done()
stmt, err := db.Prepare("select id, url, title, description, published from links where id > ? limit 30")
if err != nil {
log.Fatal(err)
return
}
defer stmt.Close()
// only 10 create post calls are allowed
for {
func() {
log.Println(time.Now().Format(time.RFC822))
// fetch these from db from last min insert id
rowsBuffer.Lock()
defer rowsBuffer.Unlock()
lrow := rowsBuffer.lastRowID
if lrow < int64(lastID) {
lrow = int64(lastID)
}
srows, err := stmt.Query(lrow)
if err != nil {
log.Println(err)
return
}
var rs []row
for srows.Next() {
var r row
err = srows.Scan(&r.ID, &r.Link, &r.Title, &r.Description, &r.Published)
if err != nil {
log.Println(err)
return
}
rs = append(rs, r)
}
err = srows.Err()
if err != nil {
log.Println(err)
return
}
lrs := len(rs)
if lrs < 10 {
log.Printf("wait for next time, have got only %d records\n", lrs)
return
}
htmlRows := prepareHTML(rs)
post, err := postToMedium(htmlRows)
if err == nil {
log.Printf("posted %s with %d articles\n", post.ID, len(rs))
for _, r := range rs {
if r.ID > rowsBuffer.lastRowID {
rowsBuffer.lastRowID = r.ID
}
}
log.Printf("last visited row is %d\n", rowsBuffer.lastRowID)
} else {
log.Println(err)
}
}()
time.Sleep(time.Hour * 24 / 100)
}
}()
wg.Wait()
}
func postToMedium(rows string) (*medium.Post, error) {
title := "Crime News Syndicate - " + time.Now().Format(time.Kitchen)
content := fmt.Sprintf("<h2>%s</h2><p><ul>%s</ul></p>", title, rows)
m := medium.NewClientWithAccessToken(mediumIntegrationToken)
return m.CreatePost(medium.CreatePostOptions{
UserID: mediumUserID,
Title: title,
Content: content,
ContentFormat: medium.ContentFormatHTML,
PublishStatus: medium.PublishStatusDraft,
Tags: postTags,
})
}
func prepareHTML(docs []row) string {
var out []string
for _, dp := range docs {
var title, desc string
r := `<li><a href="%s" title="%s" target="_blank">%s</a></li>`
if len(dp.Description) > len(dp.Title) {
title = dp.Title
desc = dp.Description
} else {
title = dp.Description
desc = dp.Title
}
r = fmt.Sprintf(r, dp.Link, title, desc)
out = append(out, r)
}
return strings.Join(out, "")
}
func insertToDB(feed <-chan *gofeed.Item, existsPStmt, insertPStmt *sql.Stmt) <-chan *docPreview {
out := make(chan *docPreview)
go func() {
routines <- 1
defer func() {
routines <- -1
}()
defer close(out)
for item := range feed {
if item.UpdatedParsed != nil && !item.UpdatedParsed.After(time.Now().Add(-1*time.Hour)) {
continue
}
var exits int
eerr := existsPStmt.QueryRow(item.Link).Scan(&exits)
if eerr != nil || exits == 1 {
continue
}
s, err := goscraper.Scrape(item.Link, 1)
if err != nil {
log.Println(err)
continue
}
dp := &docPreview{
DocumentPreview: s.Preview,
Content: s.Body.String(),
Published: item.PublishedParsed.Unix(),
}
result, err := insertPStmt.Exec(item.Link, dp.Title, dp.Description, dp.Content, dp.Published)
if err != nil {
log.Println(err)
continue
}
lid, _ := result.LastInsertId()
// send this to prepare content
dp.ID = lid
out <- dp
}
}()
return out
}
func toFeed(urls <-chan string) <-chan *gofeed.Item {
links := make(chan *gofeed.Item, 4)
var wg sync.WaitGroup
var ulinks sync.Map
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
routines <- 1
defer func() {
routines <- -1
}()
client := http.Client{
Transport: httpcache.NewTransport(diskcache.New("cache")),
}
fp := gofeed.NewParser()
defer wg.Done()
for u := range urls {
func() {
resp, err := client.Get(u)
if err != nil {
log.Println(err)
return
}
defer resp.Body.Close()
feed, err := fp.Parse(resp.Body)
if err != nil {
log.Println(err)
return
}
for _, i := range feed.Items {
_, ok := ulinks.Load(i.Link)
if ok {
continue
}
ulinks.Store(i.Link, struct{}{})
links <- i
}
}()
}
}()
}
go func() {
routines <- 1
defer func() {
routines <- -1
}()
wg.Wait()
close(links)
}()
return links
}
func rewindAndRead(f *os.File) (<-chan string, error) {
_, err := f.Seek(0, 0)
if err != nil {
return nil, err
}
return readLinesFromFile(f), nil
}
func readLinesFromFile(f *os.File) <-chan string {
out := make(chan string)
go func() {
routines <- 1
defer func() {
routines <- -1
}()
defer close(out)
r := bufio.NewReader(f)
for {
l, _, err := r.ReadLine()
if err != nil {
log.Println(err)
break
}
out <- string(l)
}
}()
return out
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment