Last active
May 18, 2021 23:48
-
-
Save entombedvirus/d943ddb9fdba2841aa84fc8bc9d4a4ae to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module mixpanel-importer | |
go 1.15 | |
require ( | |
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 | |
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"context" | |
"encoding/json" | |
"errors" | |
"flag" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"math" | |
"net/http" | |
"os" | |
"runtime" | |
"strconv" | |
"strings" | |
"time" | |
"golang.org/x/sync/errgroup" | |
) | |
const mpUrl = "https://api.mixpanel.com/v2/import/events" | |
var ( | |
user = flag.String("user", "", "service account username") | |
secret = flag.String("secret", "", "service account secret") | |
projectID = flag.Int("project-id", 0, "project id to import into") | |
concurrency = flag.Int("concurrency", runtime.GOMAXPROCS(0)*10, "number of worker goroutines") | |
verbose = flag.Bool("verbose", false, "turn on for increased logging") | |
noCompression = flag.Bool("no-compression", false, "will send uncompressed payload") | |
stats = newStats(os.Stdout) | |
) | |
func main() { | |
flag.Parse() | |
ctx := context.Background() | |
if err := run(ctx, os.Stdin); err != nil { | |
log.Fatalln(err) | |
} | |
} | |
func run(ctx context.Context, in io.Reader) error { | |
ctx, cancel := context.WithCancel(ctx) | |
eg, egCtx := errgroup.WithContext(ctx) | |
defer eg.Wait() | |
defer cancel() | |
// stats reporter | |
go func() { | |
tick := time.NewTicker(1 * time.Second) | |
defer tick.Stop() | |
for { | |
select { | |
case <-ctx.Done(): | |
case now := <-tick.C: | |
stats.Print(now) | |
} | |
} | |
}() | |
q := make(chan string) | |
for i := 0; i < *concurrency; i++ { | |
eg.Go(func() error { return worker(egCtx, q) }) | |
} | |
const batchLimit = 2 << 20 // 2 MiB | |
wr := newWriter(*noCompression) | |
sc := bufio.NewScanner(in) | |
sc.Split(bufio.ScanLines) | |
for sc.Scan() && egCtx.Err() == nil { | |
if err := wr.Writeln(sc.Bytes()); err != nil { | |
return fmt.Errorf("writeln: %w", err) | |
} | |
if !wr.ShouldFlush(batchLimit) { | |
continue | |
} | |
body, err := wr.Flush() | |
if err != nil { | |
return fmt.Errorf("flush: %w", err) | |
} | |
select { | |
case <-egCtx.Done(): | |
// noop | |
case q <- body: | |
} | |
} | |
if err := sc.Err(); err != nil { | |
return fmt.Errorf("scanner: %w", err) | |
} | |
close(q) | |
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { | |
return fmt.Errorf("worker: %w", err) | |
} | |
return nil | |
} | |
func worker(ctx context.Context, q <-chan string) error { | |
intSleep := func(d time.Duration) error { | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case <-time.After(d): | |
return nil | |
} | |
} | |
withRetry := func(body string) error { | |
for tries := 0; true; tries++ { | |
var errBadReq errBadRequest | |
err := mixpanelPost(ctx, body) | |
switch { | |
case err == nil: | |
return nil | |
case errors.As(err, &errBadReq): | |
infof("skipping request due to bad request: %v", errBadReq.body) | |
if err := saveBadRequest(ctx, errBadReq.body); err != nil { | |
panic(err) | |
} | |
return nil | |
case errors.Is(err, errRetry): | |
d := backoff(3*time.Second, 30*time.Second, tries, 1.3) | |
infof("server: retrying attempt #%d. Sleeping for %v", tries+1, d) | |
if err := intSleep(d); err != nil { | |
return fmt.Errorf("sleep: %w", err) | |
} | |
continue | |
default: | |
return err | |
} | |
} | |
return errors.New("exhausted retries") | |
} | |
for { | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case body, ok := <-q: | |
if !ok { | |
return nil | |
} | |
if err := withRetry(body); err != nil { | |
return err | |
} | |
} | |
} | |
} | |
var ( | |
errRetry = errors.New("server wants us to retry") | |
) | |
type errBadRequest struct { | |
error | |
body string | |
} | |
func mixpanelPost(ctx context.Context, body string) error { | |
req, err := http.NewRequestWithContext( | |
ctx, | |
http.MethodPost, | |
mpUrl, | |
strings.NewReader(body), | |
) | |
if err != nil { | |
return fmt.Errorf("http.NewRequest: %w", err) | |
} | |
req.SetBasicAuth(*user, *secret) | |
q := req.URL.Query() | |
q.Set("project_id", strconv.Itoa(*projectID)) | |
q.Set("lowsla", "1") | |
req.URL.RawQuery = q.Encode() | |
req.Header.Set("Content-Type", "application/x-ndjson") | |
if !*noCompression { | |
req.Header.Set("Content-Encoding", "gzip") | |
} | |
start := time.Now() | |
resp, err := http.DefaultClient.Do(req) | |
lat := time.Since(start) | |
if err != nil { | |
return fmt.Errorf("http.Do: %w", err) | |
} | |
defer resp.Body.Close() | |
stats.IncrResponseCode(resp.StatusCode) | |
var buf bytes.Buffer | |
if _, err := io.Copy(&buf, resp.Body); err != nil { | |
return fmt.Errorf("io.Copy: %w", err) | |
} | |
switch resp.StatusCode { | |
case http.StatusOK: | |
dec := json.NewDecoder(&buf) | |
var serverResponse struct { | |
EventsImported int `json:"events_imported"` | |
} | |
if err := dec.Decode(&serverResponse); err != nil { | |
return fmt.Errorf("json.Decode: server response: %w", err) | |
} | |
infof("server: imported %d events in %v", serverResponse.EventsImported, lat) | |
stats.IncrSuccess(serverResponse.EventsImported, len(body), lat) | |
return nil | |
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable: | |
return errRetry | |
case http.StatusBadRequest: | |
return errBadRequest{nil, buf.String()} | |
default: | |
return fmt.Errorf("server: %s: body: %s", resp.Status, buf.String()) | |
} | |
} | |
func backoff(baseDelay, maxDelay time.Duration, retries int, factor float64) time.Duration { | |
backoff, max := float64(baseDelay), float64(maxDelay) | |
backoff *= math.Pow(factor, float64(retries)) | |
if backoff > max { | |
backoff = max | |
} | |
// backoff = backoff/2 + rand.Float64()*backoff/2 | |
return time.Duration(backoff) | |
} | |
func infof(fmt string, args ...interface{}) { | |
if !*verbose { | |
return | |
} | |
log.Printf(fmt, args...) | |
} | |
func saveBadRequest(ctx context.Context, body string) error { | |
dir := "failed-requests" | |
if err := os.MkdirAll("failed-requests", 0755); err != nil { | |
return fmt.Errorf("os.MkdirAll: %w", err) | |
} | |
f, err := ioutil.TempFile(dir, "request.*.json") | |
if err != nil { | |
return fmt.Errorf("TempFile: %w", err) | |
} | |
defer f.Close() | |
if _, err := io.WriteString(f, body); err != nil { | |
return fmt.Errorf("WriteString: %w", err) | |
} | |
if err := f.Close(); err != nil { | |
return fmt.Errorf("file.Close: %w", err) | |
} | |
return nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -euo pipefail | |
GCS_BUCKET_PREFIX=${1?"GCS_BUCKET_PREFIX is required"} | |
function process() { | |
local obj=$1 | |
if [[ $obj != *.gz ]]; then | |
return | |
fi | |
echo "importing ${obj}" | |
local tmp_file=$(mktemp tmp.mp_import.XXX.gz) | |
echo "downloading ${obj} to ${tmp_file}" | |
gsutil cp "${obj}" "${tmp_file}" | |
local filters=() | |
# parse embedded JSON in the "payload" property if present. it | |
# sometimes has a "time" property that shouldn't be used. | |
filters+=('if (.payload != null) then (. | del(.payload)) + (.payload | fromjson | del(.time)) else . end') | |
# trim all strings | |
filters+=('.. |= (if type == "string" then .[0:200] else . end)') | |
# eventify the row | |
filters+=('{"event":.event_type, "time":(.time/1000), "insert_id": .insert_id[0:36], "distinct_id":.user_id_encid, "properties": (. | del(.event_type, .time, .insert_id))}') | |
# filter out rows that won't pass validation | |
filters+=('select(.event != null and .event != "" and .distinct_id != null and .distinct_id != "" and .insert_id != null and .insert_id != "")') | |
IFS=$'|' eval 'joined_filters="${filters[*]}"' | |
gunzip --stdout "${tmp_file}" | \ | |
jq -cr "${joined_filters}" | \ | |
go run . --project-id 793719 --user v2-api.86d66e.mp-service-account --secret 'VFg2RW7oC1bDlVzYNNPRpoWMvvcouHpN' --concurrency 8 | |
# cat | |
rm -f "${tmp_file}" | |
} | |
export -f process | |
# gsutil times out if we read from its stdout slowly. so buffer the whole list | |
# in mem | |
obj_list=$(gsutil ls ${GCS_BUCKET_PREFIX}) | |
xargs -I{} -P4 -n1 bash -c 'process "{}"' <<< "$obj_list" | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"io" | |
"sync" | |
"text/tabwriter" | |
"time" | |
"github.com/rcrowley/go-metrics" | |
) | |
type statsPrinter struct { | |
out *tabwriter.Writer | |
latency metrics.Histogram | |
sync.Mutex | |
numEvents int | |
numRequests int | |
numBytes int | |
numStatusCodes map[int]int | |
lastTick time.Time | |
} | |
func newStats(out io.Writer) *statsPrinter { | |
stats := &statsPrinter{ | |
out: tabwriter.NewWriter(out, 0, 4, 5, '\t', tabwriter.Debug), | |
numStatusCodes: make(map[int]int), | |
latency: metrics.NewHistogram(metrics.NewUniformSample(10)), | |
} | |
fmt.Fprintf( | |
stats.out, | |
"%-10s\t%-10s\t%-10s\t%-10s\t%-20s\t%-44s\t%-40s\n", | |
"Events", | |
"Requests", | |
"events/sec", | |
"KiB/sec", | |
"Response Codes", | |
"Latency Seconds (200 OKs)", | |
"Time", | |
) | |
return stats | |
} | |
func (stats *statsPrinter) Print(now time.Time) { | |
stats.Lock() | |
defer stats.Unlock() | |
if stats.numEvents == 0 { | |
return | |
} | |
hist := stats.latency.Snapshot() | |
fmt.Fprintf( | |
stats.out, | |
"%-10d\t%-10d\t%-8.02f\t%-8.02f\t%-20s\t%-44s\t%-40s\n", | |
stats.numEvents, | |
stats.numRequests, | |
float64(stats.numEvents)/now.Sub(stats.lastTick).Seconds(), | |
float64(stats.numBytes)/1024/now.Sub(stats.lastTick).Seconds(), | |
fmt.Sprintf("%v", stats.numStatusCodes), | |
fmt.Sprintf( | |
"[%.02f min, %.02f median, %.02f p99, %.02f max]", | |
time.Duration(hist.Min()).Seconds(), | |
time.Duration(hist.Percentile(0.5)).Seconds(), | |
time.Duration(hist.Percentile(0.99)).Seconds(), | |
time.Duration(hist.Max()).Seconds(), | |
), | |
now.Format(time.RFC3339), | |
) | |
_ = stats.out.Flush() | |
stats.numEvents = 0 | |
stats.numBytes = 0 | |
stats.numRequests = 0 | |
stats.numStatusCodes = make(map[int]int) | |
stats.latency.Clear() | |
stats.lastTick = now | |
} | |
func (stats *statsPrinter) IncrResponseCode(code int) { | |
stats.Lock() | |
stats.numRequests++ | |
stats.numStatusCodes[code]++ | |
stats.Unlock() | |
} | |
func (stats *statsPrinter) IncrSuccess(events, bytes int, latency time.Duration) { | |
stats.Lock() | |
stats.numBytes += bytes | |
stats.numEvents += events | |
stats.latency.Update(int64(latency)) | |
stats.Unlock() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"compress/gzip" | |
"fmt" | |
"io" | |
) | |
type writer struct { | |
io.Writer | |
buf *bytes.Buffer | |
} | |
func newWriter(skipCompression bool) *writer { | |
wr := &writer{ | |
buf: &bytes.Buffer{}, | |
} | |
wr.Writer = wr.buf | |
if !skipCompression { | |
wr.Writer = gzip.NewWriter(wr.buf) | |
} | |
return wr | |
} | |
func (wr *writer) Writeln(bs []byte) error { | |
if _, err := wr.Writer.Write(bs); err != nil { | |
return err | |
} | |
if _, err := wr.Writer.Write([]byte("\n")); err != nil { | |
return err | |
} | |
return nil | |
} | |
func (wr *writer) ShouldFlush(lim int) bool { | |
// leave some room for unflushed bytes in gzip writer | |
const padding = 128 << 10 // 128 KiB | |
return wr.buf.Len()+padding >= lim | |
} | |
func (wr *writer) Flush() (string, error) { | |
gz, ok := wr.Writer.(*gzip.Writer) | |
if ok { | |
if err := gz.Close(); err != nil { | |
return "", fmt.Errorf("gz.Close: %w", err) | |
} | |
} | |
// clones the data in buf | |
ret := wr.buf.String() | |
wr.buf.Reset() | |
if ok { | |
gz.Reset(wr.buf) | |
} | |
return ret, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment