Created
January 18, 2018 12:48
-
-
Save malisetti/959861674f8ad43fe21b4424d4c0f1b3 to your computer and use it in GitHub Desktop.
music hunter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// realmain runs the Subscribe example with a real RSS fetcher. | |
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"time" | |
"github.com/mmcdole/gofeed" | |
rss "github.com/jteeuwen/go-pkg-rss" | |
) | |
// STARTITEM OMIT | |
// An Item is a stripped-down RSS item. | |
type Item struct{ Title, Channel, GUID string } | |
// STOPITEM OMIT | |
// STARTFETCHER OMIT | |
// A Fetcher fetches Items and returns the time when the next fetch should be | |
// attempted. On failure, Fetch returns a non-nil error. | |
type Fetcher interface { | |
Fetch() (items []Item, next time.Time, err error) | |
} | |
// STOPFETCHER OMIT | |
// STARTSUBSCRIPTION OMIT | |
// A Subscription delivers Items over a channel. Close cancels the | |
// subscription, closes the Updates channel, and returns the last fetch error, | |
// if any. | |
type Subscription interface { | |
Updates() <-chan Item | |
Close() error | |
} | |
// STOPSUBSCRIPTION OMIT | |
// STARTSUBSCRIBE OMIT | |
// Subscribe returns a new Subscription that uses fetcher to fetch Items. | |
func Subscribe(fetcher Fetcher) Subscription { | |
s := &sub{ | |
fetcher: fetcher, | |
updates: make(chan Item), // for Updates | |
closing: make(chan chan error), // for Close | |
} | |
go s.loop() | |
return s | |
} | |
// STOPSUBSCRIBE OMIT | |
// sub implements the Subscription interface. | |
type sub struct { | |
fetcher Fetcher // fetches items | |
updates chan Item // sends items to the user | |
closing chan chan error // for Close | |
} | |
// STARTUPDATES OMIT | |
func (s *sub) Updates() <-chan Item { | |
return s.updates | |
} | |
// STOPUPDATES OMIT | |
// STARTCLOSE OMIT | |
// STARTCLOSESIG OMIT | |
func (s *sub) Close() error { | |
// STOPCLOSESIG OMIT | |
errc := make(chan error) | |
s.closing <- errc // HLchan | |
return <-errc // HLchan | |
} | |
// STOPCLOSE OMIT | |
// loopCloseOnly is a version of loop that includes only the logic | |
// that handles Close. | |
func (s *sub) loopCloseOnly() { | |
// STARTCLOSEONLY OMIT | |
var err error // set when Fetch fails | |
for { | |
select { | |
case errc := <-s.closing: // HLchan | |
errc <- err // HLchan | |
close(s.updates) // tells receiver we're done | |
return | |
} | |
} | |
// STOPCLOSEONLY OMIT | |
} | |
// loopFetchOnly is a version of loop that includes only the logic | |
// that calls Fetch. | |
func (s *sub) loopFetchOnly() { | |
// STARTFETCHONLY OMIT | |
var pending []Item // appended by fetch; consumed by send | |
var next time.Time // initially January 1, year 0 | |
var err error | |
for { | |
var fetchDelay time.Duration // initally 0 (no delay) | |
if now := time.Now(); next.After(now) { | |
fetchDelay = next.Sub(now) | |
} | |
startFetch := time.After(fetchDelay) | |
select { | |
case <-startFetch: | |
var fetched []Item | |
fetched, next, err = s.fetcher.Fetch() | |
if err != nil { | |
next = time.Now().Add(10 * time.Second) | |
break | |
} | |
pending = append(pending, fetched...) | |
} | |
} | |
// STOPFETCHONLY OMIT | |
} | |
// loopSendOnly is a version of loop that includes only the logic for | |
// sending items to s.updates. | |
func (s *sub) loopSendOnly() { | |
// STARTSENDONLY OMIT | |
var pending []Item // appended by fetch; consumed by send | |
for { | |
var first Item | |
var updates chan Item // HLupdates | |
if len(pending) > 0 { | |
first = pending[0] | |
updates = s.updates // enable send case // HLupdates | |
} | |
select { | |
case updates <- first: | |
pending = pending[1:] | |
} | |
} | |
// STOPSENDONLY OMIT | |
} | |
// mergedLoop is a version of loop that combines loopCloseOnly, | |
// loopFetchOnly, and loopSendOnly. | |
func (s *sub) mergedLoop() { | |
// STARTFETCHVARS OMIT | |
var pending []Item | |
var next time.Time | |
var err error | |
// STOPFETCHVARS OMIT | |
for { | |
// STARTNOCAP OMIT | |
var fetchDelay time.Duration | |
if now := time.Now(); next.After(now) { | |
fetchDelay = next.Sub(now) | |
} | |
startFetch := time.After(fetchDelay) | |
// STOPNOCAP OMIT | |
var first Item | |
var updates chan Item | |
if len(pending) > 0 { | |
first = pending[0] | |
updates = s.updates // enable send case | |
} | |
// STARTSELECT OMIT | |
select { | |
case errc := <-s.closing: // HLcases | |
errc <- err | |
close(s.updates) | |
return | |
// STARTFETCHCASE OMIT | |
case <-startFetch: // HLcases | |
var fetched []Item | |
fetched, next, err = s.fetcher.Fetch() // HLfetch | |
if err != nil { | |
next = time.Now().Add(10 * time.Second) | |
break | |
} | |
pending = append(pending, fetched...) // HLfetch | |
// STOPFETCHCASE OMIT | |
case updates <- first: // HLcases | |
pending = pending[1:] | |
} | |
// STOPSELECT OMIT | |
} | |
} | |
// dedupeLoop extends mergedLoop with deduping of fetched items. | |
func (s *sub) dedupeLoop() { | |
const maxPending = 10 | |
// STARTSEEN OMIT | |
var pending []Item | |
var next time.Time | |
var err error | |
var seen = make(map[string]bool) // set of item.GUIDs // HLseen | |
// STOPSEEN OMIT | |
for { | |
// STARTCAP OMIT | |
var fetchDelay time.Duration | |
if now := time.Now(); next.After(now) { | |
fetchDelay = next.Sub(now) | |
} | |
var startFetch <-chan time.Time // HLcap | |
if len(pending) < maxPending { // HLcap | |
startFetch = time.After(fetchDelay) // enable fetch case // HLcap | |
} // HLcap | |
// STOPCAP OMIT | |
var first Item | |
var updates chan Item | |
if len(pending) > 0 { | |
first = pending[0] | |
updates = s.updates // enable send case | |
} | |
select { | |
case errc := <-s.closing: | |
errc <- err | |
close(s.updates) | |
return | |
// STARTDEDUPE OMIT | |
case <-startFetch: | |
var fetched []Item | |
fetched, next, err = s.fetcher.Fetch() // HLfetch | |
if err != nil { | |
next = time.Now().Add(10 * time.Second) | |
break | |
} | |
for _, item := range fetched { | |
if !seen[item.GUID] { // HLdupe | |
pending = append(pending, item) // HLdupe | |
seen[item.GUID] = true // HLdupe | |
} // HLdupe | |
} | |
// STOPDEDUPE OMIT | |
case updates <- first: | |
pending = pending[1:] | |
} | |
} | |
} | |
// loop periodically fecthes Items, sends them on s.updates, and exits | |
// when Close is called. It extends dedupeLoop with logic to run | |
// Fetch asynchronously. | |
func (s *sub) loop() { | |
const maxPending = 10 | |
type fetchResult struct { | |
fetched []Item | |
next time.Time | |
err error | |
} | |
// STARTFETCHDONE OMIT | |
var fetchDone chan fetchResult // if non-nil, Fetch is running // HL | |
// STOPFETCHDONE OMIT | |
var pending []Item | |
var next time.Time | |
var err error | |
var seen = make(map[string]bool) | |
for { | |
var fetchDelay time.Duration | |
if now := time.Now(); next.After(now) { | |
fetchDelay = next.Sub(now) | |
} | |
// STARTFETCHIF OMIT | |
var startFetch <-chan time.Time | |
if fetchDone == nil && len(pending) < maxPending { // HLfetch | |
startFetch = time.After(fetchDelay) // enable fetch case | |
} | |
// STOPFETCHIF OMIT | |
var first Item | |
var updates chan Item | |
if len(pending) > 0 { | |
first = pending[0] | |
updates = s.updates // enable send case | |
} | |
// STARTFETCHASYNC OMIT | |
select { | |
case <-startFetch: // HLfetch | |
fetchDone = make(chan fetchResult, 1) // HLfetch | |
go func() { | |
fetched, next, err := s.fetcher.Fetch() | |
fetchDone <- fetchResult{fetched, next, err} | |
}() | |
case result := <-fetchDone: // HLfetch | |
fetchDone = nil // HLfetch | |
// Use result.fetched, result.next, result.err | |
// STOPFETCHASYNC OMIT | |
fetched := result.fetched | |
next, err = result.next, result.err | |
if err != nil { | |
next = time.Now().Add(10 * time.Second) | |
break | |
} | |
for _, item := range fetched { | |
if id := item.GUID; !seen[id] { // HLdupe | |
pending = append(pending, item) | |
seen[id] = true // HLdupe | |
} | |
} | |
case errc := <-s.closing: | |
errc <- err | |
close(s.updates) | |
return | |
case updates <- first: | |
pending = pending[1:] | |
} | |
} | |
} | |
// naiveMerge is a version of Merge that doesn't quite work right. In | |
// particular, the goroutines it starts may block forever on m.updates | |
// if the receiver stops receiving. | |
type naiveMerge struct { | |
subs []Subscription | |
updates chan Item | |
} | |
// STARTNAIVEMERGE OMIT | |
func NaiveMerge(subs ...Subscription) Subscription { | |
m := &naiveMerge{ | |
subs: subs, | |
updates: make(chan Item), | |
} | |
// STARTNAIVEMERGELOOP OMIT | |
for _, sub := range subs { | |
go func(s Subscription) { | |
for it := range s.Updates() { | |
m.updates <- it // HL | |
} | |
}(sub) | |
} | |
// STOPNAIVEMERGELOOP OMIT | |
return m | |
} | |
// STOPNAIVEMERGE OMIT | |
// STARTNAIVEMERGECLOSE OMIT | |
func (m *naiveMerge) Close() (err error) { | |
for _, sub := range m.subs { | |
if e := sub.Close(); err == nil && e != nil { | |
err = e | |
} | |
} | |
close(m.updates) // HL | |
return | |
} | |
// STOPNAIVEMERGECLOSE OMIT | |
func (m *naiveMerge) Updates() <-chan Item { | |
return m.updates | |
} | |
type merge struct { | |
subs []Subscription | |
updates chan Item | |
quit chan struct{} | |
errs chan error | |
} | |
// STARTMERGESIG OMIT | |
// Merge returns a Subscription that merges the item streams from subs. | |
// Closing the merged subscription closes subs. | |
func Merge(subs ...Subscription) Subscription { | |
// STOPMERGESIG OMIT | |
m := &merge{ | |
subs: subs, | |
updates: make(chan Item), | |
quit: make(chan struct{}), | |
errs: make(chan error), | |
} | |
// STARTMERGE OMIT | |
for _, sub := range subs { | |
go func(s Subscription) { | |
for { | |
var it Item | |
select { | |
case it = <-s.Updates(): | |
case <-m.quit: // HL | |
m.errs <- s.Close() // HL | |
return // HL | |
} | |
select { | |
case m.updates <- it: | |
case <-m.quit: // HL | |
m.errs <- s.Close() // HL | |
return // HL | |
} | |
} | |
}(sub) | |
} | |
// STOPMERGE OMIT | |
return m | |
} | |
func (m *merge) Updates() <-chan Item { | |
return m.updates | |
} | |
// STARTMERGECLOSE OMIT | |
func (m *merge) Close() (err error) { | |
close(m.quit) // HL | |
for _ = range m.subs { | |
if e := <-m.errs; e != nil { // HL | |
err = e | |
} | |
} | |
close(m.updates) // HL | |
return | |
} | |
// STOPMERGECLOSE OMIT | |
// NaiveDedupe converts a stream of Items that may contain duplicates | |
// into one that doesn't. | |
func NaiveDedupe(in <-chan Item) <-chan Item { | |
out := make(chan Item) | |
go func() { | |
seen := make(map[string]bool) | |
for it := range in { | |
if !seen[it.GUID] { | |
// BUG: this send blocks if the | |
// receiver closes the Subscription | |
// and stops receiving. | |
out <- it // HL | |
seen[it.GUID] = true | |
} | |
} | |
close(out) | |
}() | |
return out | |
} | |
type deduper struct { | |
s Subscription | |
updates chan Item | |
closing chan chan error | |
} | |
// Dedupe converts a Subscription that may send duplicate Items into | |
// one that doesn't. | |
func Dedupe(s Subscription) Subscription { | |
d := &deduper{ | |
s: s, | |
updates: make(chan Item), | |
closing: make(chan chan error), | |
} | |
go d.loop() | |
return d | |
} | |
func (d *deduper) loop() { | |
in := d.s.Updates() // enable receive | |
var pending Item | |
var out chan Item // disable send | |
seen := make(map[string]bool) | |
for { | |
select { | |
case it := <-in: | |
if !seen[it.GUID] { | |
pending = it | |
in = nil // disable receive | |
out = d.updates // enable send | |
seen[it.GUID] = true | |
} | |
case out <- pending: | |
in = d.s.Updates() // enable receive | |
out = nil // disable send | |
case errc := <-d.closing: | |
err := d.s.Close() | |
errc <- err | |
close(d.updates) | |
return | |
} | |
} | |
} | |
func (d *deduper) Close() error { | |
errc := make(chan error) | |
d.closing <- errc | |
return <-errc | |
} | |
func (d *deduper) Updates() <-chan Item { | |
return d.updates | |
} | |
// FakeFetch causes Fetch to use a fake fetcher instead of the real | |
// one. | |
var FakeFetch bool | |
// Fetch returns a Fetcher for Items from domain. | |
func Fetch(domain string) Fetcher { | |
if FakeFetch { | |
return fakeFetch(domain) | |
} | |
return realFetch(domain) | |
} | |
func fakeFetch(domain string) Fetcher { | |
return &fakeFetcher{channel: domain} | |
} | |
type fakeFetcher struct { | |
channel string | |
items []Item | |
} | |
// FakeDuplicates causes the fake fetcher to return duplicate items. | |
var FakeDuplicates bool | |
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { | |
now := time.Now() | |
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) | |
item := Item{ | |
Channel: f.channel, | |
Title: fmt.Sprintf("Item %d", len(f.items)), | |
} | |
item.GUID = item.Channel + "/" + item.Title | |
f.items = append(f.items, item) | |
if FakeDuplicates { | |
items = f.items | |
} else { | |
items = []Item{item} | |
} | |
return | |
} | |
// realFetch returns a fetcher for the specified blogger domain. | |
func realFetch(domain string) Fetcher { | |
return NewFetcher(fmt.Sprintf("http://%s/feeds/posts/default?alt=rss", domain)) | |
} | |
type fetcher struct { | |
uri string | |
feed *rss.Feed | |
items []Item | |
} | |
type ytfetcher struct { | |
uri string | |
feed *gofeed.Parser | |
items []Item | |
} | |
// NewFetcher returns a Fetcher for uri. | |
func NewFetcher(uri string) Fetcher { | |
f := &fetcher{ | |
uri: uri, | |
} | |
newChans := func(feed *rss.Feed, chans []*rss.Channel) {} | |
newItems := func(feed *rss.Feed, ch *rss.Channel, items []*rss.Item) { | |
for _, it := range items { | |
i := Item{ | |
Channel: ch.Title, | |
Title: it.Title, | |
} | |
if it.Guid != nil { | |
i.GUID = *it.Guid | |
} | |
f.items = append(f.items, i) | |
} | |
} | |
f.feed = rss.New(1 /*minimum interval in minutes*/, true /*respect limit*/, newChans, newItems) | |
return f | |
} | |
// NewYTFetcher returns a Fetcher for uri. | |
func NewYTFetcher(uri string) (Fetcher, error) { | |
// https://www.youtube.com/feeds/videos.xml?channel_id=UCw8ZhLPdQ0u_Y-TLKd61hGA | |
f := &ytfetcher{ | |
uri: uri, | |
} | |
fp := gofeed.NewParser() | |
f.feed = fp | |
return f, nil | |
} | |
func (f *fetcher) Fetch() (items []Item, next time.Time, err error) { | |
fmt.Println("fetching", f.uri) | |
if err = f.feed.Fetch(f.uri, nil); err != nil { | |
return | |
} | |
items = f.items | |
f.items = nil | |
next = time.Now().Add(time.Duration(f.feed.SecondsTillUpdate()) * time.Second) | |
return | |
} | |
func (f *ytfetcher) Fetch() (items []Item, next time.Time, err error) { | |
fmt.Println("fetching", f.uri) | |
feed, err := f.feed.ParseURL(f.uri) | |
for _, it := range feed.Items { | |
i := Item{ | |
Title: it.Title, | |
Channel: it.Link, | |
GUID: it.GUID, | |
} | |
items = append(items, i) | |
} | |
f.items = items | |
f.items = nil | |
next = time.Now().Add(time.Duration(1000) * time.Second) | |
return | |
} | |
// TODO: in a longer talk: move the Subscribe function onto a Reader type, to | |
// support dynamically adding and removing Subscriptions. Reader should dedupe. | |
// TODO: in a longer talk: make successive Subscribe calls for the same uri | |
// share the same underlying Subscription, but provide duplicate streams. | |
func init() { | |
rand.Seed(time.Now().UnixNano()) | |
} | |
// STARTMAIN OMIT | |
func main() { | |
// STARTMERGECALL OMIT | |
// Subscribe to some feeds, and create a merged update stream. | |
f, err := NewYTFetcher("https://www.youtube.com/feeds/videos.xml?channel_id=UCw8ZhLPdQ0u_Y-TLKd61hGA") | |
// var subscriptions []Subscription | |
subscriptions := []Subscription{ | |
Subscribe(Fetch("blog.golang.org")), | |
Subscribe(Fetch("googleblog.blogspot.com")), | |
Subscribe(Fetch("googledevelopers.blogspot.com")), | |
} | |
if err == nil { | |
subscriptions = append(subscriptions, Subscribe(f)) | |
} | |
merged := Merge(subscriptions...) | |
// STOPMERGECALL OMIT | |
// Close the subscriptions after some time. | |
time.AfterFunc(3*time.Second, func() { | |
fmt.Println("subscriptions closed:", merged.Close()) | |
}) | |
// Print the stream. | |
for it := range merged.Updates() { | |
fmt.Println(it.Channel, it.Title) | |
} | |
// Uncomment the panic below to dump the stack traces. This | |
// will show several stacks for persistent HTTP connections | |
// created by the real RSS client. To clean these up, we'll | |
// need to extend Fetcher with a Close method and plumb this | |
// through the RSS client implementation. | |
// | |
// panic("show me the stacks") | |
} | |
// STOPMAIN OMIT |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment