Last active
November 7, 2018 21:27
-
-
Save rybit/0612f4ba6a5e30967899916fde972c1a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"compress/gzip" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"os/signal" | |
"path" | |
"syscall" | |
"time" | |
"gopkg.in/mgo.v2/bson" | |
"gopkg.in/mgo.v2" | |
) | |
const ( | |
dbName = "bitballoon" | |
colName = "transformations" | |
) | |
var kindOld = time.Date(2018, time.June, 1, 1, 0, 0, 0, time.UTC) | |
var reallyOld = time.Date(2018, time.January, 1, 1, 0, 0, 0, time.UTC) | |
func main() { | |
var limit, checkin, rollEvery, msecDelay int | |
var gzip, enableWrite, rm bool | |
var outdir string | |
// control flags | |
flag.IntVar(&limit, "limit", 0, "limit to the numbers of records") | |
flag.IntVar(&checkin, "checkin", 0, "limit to the numbers of records") | |
flag.IntVar(&msecDelay, "msec", 0, "millisecond delay between each read") | |
flag.BoolVar(&rm, "rm", false, "if we should remove the entries from the db") | |
// write flags | |
flag.BoolVar(&enableWrite, "write", false, "if we should write it out to disk") | |
flag.BoolVar(&gzip, "gzip", true, "if the output should be gzipped") | |
flag.StringVar(&outdir, "outdir", "", "the output directory for partial results") | |
flag.IntVar(&rollEvery, "entries", 10000, "the number of entries to put in a file") | |
flag.Parse() | |
if len(flag.Args()) != 1 { | |
log.Fatal("Must provide the mongo URL") | |
} | |
shutdown := make(chan os.Signal, 1) | |
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) | |
// connect to the DB | |
url := flag.Args()[0] | |
log.Printf("Connecting to %s\n", url) | |
sess, err := mgo.DialWithTimeout(url, time.Second*10) | |
fatalIf(err, "failed to connect to mongo") | |
trans := sess.DB(dbName).C(colName) | |
// total count | |
total, err := trans.Count() | |
fatalIf(err, "failed to count the number size of the collection") | |
log.Printf("total: %d", total) | |
// create an interface to write it out | |
var out *rollingWriter | |
if enableWrite { | |
out = &rollingWriter{ | |
gzip: gzip, | |
rollEvery: rollEvery, | |
outPath: outdir, | |
} | |
out.roll() | |
defer out.close() | |
} | |
// scan and write them out | |
start := time.Now() | |
var seen, noTimes, kindaOldCnt, reallyOldCnt, written, removed int | |
fmt.Printf("Starting to scan") | |
iter := trans.Find(nil).Iter() | |
defer iter.Close() | |
incoming := make(chan *transformation) | |
go func() { | |
delay := time.Millisecond * time.Duration(msecDelay) | |
item := new(transformation) | |
for iter.Next(item) { | |
incoming <- item | |
item = new(transformation) | |
time.Sleep(delay) | |
} | |
fatalIf(iter.Err(), "error while iterating over the collection") | |
shutdown <- syscall.SIGINT | |
}() | |
roundTime := time.Now() | |
status := func() { | |
now := time.Now() | |
log.Printf("now %s, since start %s, this round: %s", now, now.Sub(start), now.Sub(roundTime)) | |
log.Printf("seen: %d", seen) | |
log.Printf("kinda old: %d", kindaOldCnt) | |
log.Printf("really old: %d", reallyOldCnt) | |
log.Printf("no times: %d", noTimes) | |
log.Printf("written: %d", written) | |
log.Printf("removed: %d", removed) | |
roundTime = now | |
} | |
cont := true | |
for cont { | |
select { | |
case <-shutdown: | |
log.Printf("shutting down") | |
cont = false | |
case item := <-incoming: | |
seen++ | |
if checkin > 0 && seen%checkin == 0 { | |
status() | |
} | |
if limit > 0 && seen == limit { | |
cont = false | |
continue | |
} | |
var old bool | |
if item.CreatedAt.IsZero() { | |
noTimes++ | |
old = true | |
} else if item.CreatedAt.Before(reallyOld) { | |
reallyOldCnt++ | |
old = true | |
} else if item.CreatedAt.Before(kindOld) { | |
kindaOldCnt++ | |
old = true | |
} | |
if old { | |
if out != nil { | |
out.write(item) | |
written++ | |
} | |
if rm { | |
fatalIf(trans.RemoveId(item.ID), "failed to remove %s", item.ID) | |
removed++ | |
} | |
} | |
} | |
} | |
status() | |
} | |
type rollingWriter struct { | |
gzip bool | |
rollEvery int | |
outPath string | |
curPage int | |
curCount int | |
curFile *os.File | |
curGzip *gzip.Writer | |
curEnc *json.Encoder | |
} | |
func (rw *rollingWriter) write(item *transformation) { | |
rw.curEnc.Encode(item) | |
rw.curCount++ | |
if rw.curCount >= rw.rollEvery { | |
rw.roll() | |
} | |
} | |
func (rw *rollingWriter) roll() { | |
rw.close() | |
rw.curPage++ | |
rw.curCount = 0 | |
fname := fmt.Sprintf("%04d.jsonl", rw.curPage) | |
if rw.gzip { | |
fname += ".gz" | |
} | |
var err error | |
rw.curFile, err = os.Create(path.Join(rw.outPath, fname)) | |
fatalIf(err, "failed to make a new file") | |
var writer io.Writer = rw.curFile | |
if rw.gzip { | |
rw.curGzip = gzip.NewWriter(rw.curFile) | |
writer = rw.curGzip | |
} | |
rw.curEnc = json.NewEncoder(writer) | |
} | |
func (rw *rollingWriter) close() { | |
if rw.curGzip != nil { | |
fatalIf(rw.curGzip.Close(), "failed to close the gzip stream") | |
} | |
if rw.curFile != nil { | |
fatalIf(rw.curFile.Close(), "failed to close file") | |
} | |
} | |
/* | |
{ | |
"_id" : ObjectId("52bd906cf25a3f71eb000003"), | |
"from" : "1508a7fcfdd34cfdc5cc128861364246d0dd0305", | |
"metadata" : { | |
"js_bundles" : { | |
}, | |
"css_bundles" : { | |
"46e3381e9569c09e85a30b9e5a2af7525ee9fd78" : [ | |
"style.css" | |
] | |
}, | |
"cdn_files" : [ | |
"/images/img02.jpg", | |
"/images/img06.jpg", | |
"/images/img07.jpg", | |
"/images/img08.jpg" | |
], | |
"forms" : [ ] | |
}, | |
"to" : "849a1265a47cfc5cea759078d75537a932c35579", | |
"version" : 1 | |
} | |
*/ | |
type transformation struct { | |
ID bson.ObjectId `bson:"_id" json:"id"` | |
From string `bson:"from" json:"from"` | |
Metadata map[string]interface{} `bson:"metadata" json:"metadata"` | |
To string `bson:"to" json:"to"` | |
Version int `bson:"version" json:"version"` | |
CreatedAt time.Time `bson:"created_at" json:"created_at"` | |
} | |
func fatalIf(err error, msg string, args ...interface{}) { | |
if err != nil { | |
log.Fatal(fmt.Sprintf(msg, args...) + ":" + err.Error()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment