Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Pulling out my public events from 2013.
// Process github event data exports.
//
// Go here for more info: http://www.githubarchive.org/
package main
import (
"compress/gzip"
"encoding/csv"
"encoding/json"
"io"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/couchbaselabs/cbfs/client"
"github.com/dustin/go-humanize"
)
var totalRead = int64(0)
func maybeFatal(err error, msg ...string) {
if err != nil {
if len(msg) > 0 {
log.Panicf("Error: %v (%v)", err, msg)
} else {
log.Panicf("Error: %v", err)
}
}
}
type record struct {
Type string
Created time.Time `json:"created_at"`
Actor string
Payload struct {
Shas [][]interface{}
}
Repository struct {
Owner, Name string
}
ActorInfo struct {
Name string
} `json:"actor_attribues"`
}
func stringOf(a interface{}) string {
if s, ok := a.(string); ok {
return s
}
return ""
}
func isMyAddr(s string) bool {
return s == "dustin@spy.net"
}
func (r record) isMine() bool {
defer func() {
if err, ok := recover().(error); ok {
log.Fatalf("Error dealing with %v: %v", r, err)
}
}()
if r.Actor == "dustin" {
return true
}
for _, s := range r.Payload.Shas {
if isMyAddr(stringOf(s[1])) {
return true
}
}
return false
}
func dofile(cbfs *cbfsclient.Client, fn string, chout chan<- record) {
var d *json.Decoder
var lastErr error
// Retry the fetch a couple of times. Every once in a while,
// I get a bad start
for i := 0; i < 3; i++ {
lastErr = nil
f, err := cbfs.Get(fn)
if err != nil {
log.Printf("Error getting %v: %v", fn, err)
lastErr = err
continue
}
defer f.Close()
gz, err := gzip.NewReader(f)
if err != nil {
log.Printf("Error uncompressing %v: %v", fn, err)
lastErr = err
continue
}
defer gz.Close()
d = json.NewDecoder(gz)
}
maybeFatal(lastErr)
eventcount := 0
for {
thing := record{}
if err := d.Decode(&thing); err != nil {
if err != io.EOF {
log.Printf("Error decoding %v: %v", fn, err)
}
log.Printf("Processed %s events from %v",
humanize.Comma(int64(eventcount)), filepath.Base(fn))
atomic.AddInt64(&totalRead, int64(eventcount))
break
}
// This is where you do something exciting with the data.
eventcount += 1
if thing.isMine() {
chout <- thing
}
}
}
func processor(cbfs *cbfsclient.Client, ch <-chan string, chout chan<- record, wg *sync.WaitGroup) {
defer wg.Done()
for fn := range ch {
dofile(cbfs, fn, chout)
}
}
func reporter(chout <-chan record, wg *sync.WaitGroup) {
defer wg.Done()
fout, err := os.Create("stuff.csv")
maybeFatal(err)
defer fout.Close()
c := csv.NewWriter(fout)
defer c.Flush()
c.Write([]string{"date", "type", "actor", "committer", "repo", "commit"})
for r := range chout {
log.Printf("got %#v", r)
for _, s := range r.Payload.Shas {
ts := r.Created.UTC().Format(time.RFC3339)
repo := r.Repository.Owner + "/" + r.Repository.Name
if isMyAddr(stringOf(s[1])) {
c.Write([]string{ts, r.Type, r.Actor, stringOf(s[1]), repo, stringOf(s[0])})
}
}
}
}
// 2013-01-25-10.json.gz
type fnames []string
func (f fnames) Len() int {
return len(f)
}
func (f fnames) Swap(i, j int) {
f[j], f[i] = f[i], f[j]
}
func (f fnames) Less(i, j int) bool {
asnum := func(s string) int {
sp := strings.FieldsFunc(s, func(r rune) bool { return r == '-' || r == '.' })
rv := 0
m := []int{0, 10000, 100, 100}
for i, s := range sp {
if n, err := strconv.Atoi(s); err == nil {
rv = (rv * m[i]) + n
}
}
return rv
}
return asnum(f[i]) < asnum(f[j])
}
func main() {
wg := &sync.WaitGroup{}
ch := make(chan string)
chout := make(chan record)
wg2 := &sync.WaitGroup{}
wg2.Add(1)
go reporter(chout, wg2)
cbfs, err := cbfsclient.New(os.Args[1])
maybeFatal(err)
for i := 0; i < 8; i++ {
wg.Add(1)
go processor(cbfs, ch, chout, wg)
}
const base = "data/github"
things, err := cbfs.List(base)
maybeFatal(err)
fns := fnames{}
for fn := range things.Files {
if strings.HasPrefix(fn, "2013-") {
fns = append(fns, fn)
}
}
sort.Sort(fns)
for _, fn := range fns {
ch <- base + "/" + fn
}
close(ch)
wg.Wait()
log.Printf("Processed %v total events", humanize.Comma(totalRead))
close(chout)
wg2.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment