Skip to content

Instantly share code, notes, and snippets.

@rybit
Last active November 7, 2018 21:27
Show Gist options
  • Save rybit/0612f4ba6a5e30967899916fde972c1a to your computer and use it in GitHub Desktop.
Save rybit/0612f4ba6a5e30967899916fde972c1a to your computer and use it in GitHub Desktop.
package main
import (
"compress/gzip"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"path"
"syscall"
"time"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2"
)
const (
dbName = "bitballoon"
colName = "transformations"
)
var kindOld = time.Date(2018, time.June, 1, 1, 0, 0, 0, time.UTC)
var reallyOld = time.Date(2018, time.January, 1, 1, 0, 0, 0, time.UTC)
func main() {
var limit, checkin, rollEvery, msecDelay int
var gzip, enableWrite, rm bool
var outdir string
// control flags
flag.IntVar(&limit, "limit", 0, "limit to the numbers of records")
flag.IntVar(&checkin, "checkin", 0, "limit to the numbers of records")
flag.IntVar(&msecDelay, "msec", 0, "millisecond delay between each read")
flag.BoolVar(&rm, "rm", false, "if we should remove the entries from the db")
// write flags
flag.BoolVar(&enableWrite, "write", false, "if we should write it out to disk")
flag.BoolVar(&gzip, "gzip", true, "if the output should be gzipped")
flag.StringVar(&outdir, "outdir", "", "the output directory for partial results")
flag.IntVar(&rollEvery, "entries", 10000, "the number of entries to put in a file")
flag.Parse()
if len(flag.Args()) != 1 {
log.Fatal("Must provide the mongo URL")
}
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
// connect to the DB
url := flag.Args()[0]
log.Printf("Connecting to %s\n", url)
sess, err := mgo.DialWithTimeout(url, time.Second*10)
fatalIf(err, "failed to connect to mongo")
trans := sess.DB(dbName).C(colName)
// total count
total, err := trans.Count()
fatalIf(err, "failed to count the number size of the collection")
log.Printf("total: %d", total)
// create an interface to write it out
var out *rollingWriter
if enableWrite {
out = &rollingWriter{
gzip: gzip,
rollEvery: rollEvery,
outPath: outdir,
}
out.roll()
defer out.close()
}
// scan and write them out
start := time.Now()
var seen, noTimes, kindaOldCnt, reallyOldCnt, written, removed int
fmt.Printf("Starting to scan")
iter := trans.Find(nil).Iter()
defer iter.Close()
incoming := make(chan *transformation)
go func() {
delay := time.Millisecond * time.Duration(msecDelay)
item := new(transformation)
for iter.Next(item) {
incoming <- item
item = new(transformation)
time.Sleep(delay)
}
fatalIf(iter.Err(), "error while iterating over the collection")
shutdown <- syscall.SIGINT
}()
roundTime := time.Now()
status := func() {
now := time.Now()
log.Printf("now %s, since start %s, this round: %s", now, now.Sub(start), now.Sub(roundTime))
log.Printf("seen: %d", seen)
log.Printf("kinda old: %d", kindaOldCnt)
log.Printf("really old: %d", reallyOldCnt)
log.Printf("no times: %d", noTimes)
log.Printf("written: %d", written)
log.Printf("removed: %d", removed)
roundTime = now
}
cont := true
for cont {
select {
case <-shutdown:
log.Printf("shutting down")
cont = false
case item := <-incoming:
seen++
if checkin > 0 && seen%checkin == 0 {
status()
}
if limit > 0 && seen == limit {
cont = false
continue
}
var old bool
if item.CreatedAt.IsZero() {
noTimes++
old = true
} else if item.CreatedAt.Before(reallyOld) {
reallyOldCnt++
old = true
} else if item.CreatedAt.Before(kindOld) {
kindaOldCnt++
old = true
}
if old {
if out != nil {
out.write(item)
written++
}
if rm {
fatalIf(trans.RemoveId(item.ID), "failed to remove %s", item.ID)
removed++
}
}
}
}
status()
}
type rollingWriter struct {
gzip bool
rollEvery int
outPath string
curPage int
curCount int
curFile *os.File
curGzip *gzip.Writer
curEnc *json.Encoder
}
func (rw *rollingWriter) write(item *transformation) {
rw.curEnc.Encode(item)
rw.curCount++
if rw.curCount >= rw.rollEvery {
rw.roll()
}
}
func (rw *rollingWriter) roll() {
rw.close()
rw.curPage++
rw.curCount = 0
fname := fmt.Sprintf("%04d.jsonl", rw.curPage)
if rw.gzip {
fname += ".gz"
}
var err error
rw.curFile, err = os.Create(path.Join(rw.outPath, fname))
fatalIf(err, "failed to make a new file")
var writer io.Writer = rw.curFile
if rw.gzip {
rw.curGzip = gzip.NewWriter(rw.curFile)
writer = rw.curGzip
}
rw.curEnc = json.NewEncoder(writer)
}
func (rw *rollingWriter) close() {
if rw.curGzip != nil {
fatalIf(rw.curGzip.Close(), "failed to close the gzip stream")
}
if rw.curFile != nil {
fatalIf(rw.curFile.Close(), "failed to close file")
}
}
/*
{
"_id" : ObjectId("52bd906cf25a3f71eb000003"),
"from" : "1508a7fcfdd34cfdc5cc128861364246d0dd0305",
"metadata" : {
"js_bundles" : {
},
"css_bundles" : {
"46e3381e9569c09e85a30b9e5a2af7525ee9fd78" : [
"style.css"
]
},
"cdn_files" : [
"/images/img02.jpg",
"/images/img06.jpg",
"/images/img07.jpg",
"/images/img08.jpg"
],
"forms" : [ ]
},
"to" : "849a1265a47cfc5cea759078d75537a932c35579",
"version" : 1
}
*/
type transformation struct {
ID bson.ObjectId `bson:"_id" json:"id"`
From string `bson:"from" json:"from"`
Metadata map[string]interface{} `bson:"metadata" json:"metadata"`
To string `bson:"to" json:"to"`
Version int `bson:"version" json:"version"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
}
func fatalIf(err error, msg string, args ...interface{}) {
if err != nil {
log.Fatal(fmt.Sprintf(msg, args...) + ":" + err.Error())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment