Skip to content

Instantly share code, notes, and snippets.

@entombedvirus
Last active May 18, 2021 23:48
Show Gist options
  • Save entombedvirus/d943ddb9fdba2841aa84fc8bc9d4a4ae to your computer and use it in GitHub Desktop.
Save entombedvirus/d943ddb9fdba2841aa84fc8bc9d4a4ae to your computer and use it in GitHub Desktop.
module mixpanel-importer
go 1.15
require (
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"time"
"golang.org/x/sync/errgroup"
)
const mpUrl = "https://api.mixpanel.com/v2/import/events"
var (
user = flag.String("user", "", "service account username")
secret = flag.String("secret", "", "service account secret")
projectID = flag.Int("project-id", 0, "project id to import into")
concurrency = flag.Int("concurrency", runtime.GOMAXPROCS(0)*10, "number of worker goroutines")
verbose = flag.Bool("verbose", false, "turn on for increased logging")
noCompression = flag.Bool("no-compression", false, "will send uncompressed payload")
stats = newStats(os.Stdout)
)
func main() {
flag.Parse()
ctx := context.Background()
if err := run(ctx, os.Stdin); err != nil {
log.Fatalln(err)
}
}
func run(ctx context.Context, in io.Reader) error {
ctx, cancel := context.WithCancel(ctx)
eg, egCtx := errgroup.WithContext(ctx)
defer eg.Wait()
defer cancel()
// stats reporter
go func() {
tick := time.NewTicker(1 * time.Second)
defer tick.Stop()
for {
select {
case <-ctx.Done():
case now := <-tick.C:
stats.Print(now)
}
}
}()
q := make(chan string)
for i := 0; i < *concurrency; i++ {
eg.Go(func() error { return worker(egCtx, q) })
}
const batchLimit = 2 << 20 // 2 MiB
wr := newWriter(*noCompression)
sc := bufio.NewScanner(in)
sc.Split(bufio.ScanLines)
for sc.Scan() && egCtx.Err() == nil {
if err := wr.Writeln(sc.Bytes()); err != nil {
return fmt.Errorf("writeln: %w", err)
}
if !wr.ShouldFlush(batchLimit) {
continue
}
body, err := wr.Flush()
if err != nil {
return fmt.Errorf("flush: %w", err)
}
select {
case <-egCtx.Done():
// noop
case q <- body:
}
}
if err := sc.Err(); err != nil {
return fmt.Errorf("scanner: %w", err)
}
close(q)
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("worker: %w", err)
}
return nil
}
func worker(ctx context.Context, q <-chan string) error {
intSleep := func(d time.Duration) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d):
return nil
}
}
withRetry := func(body string) error {
for tries := 0; true; tries++ {
var errBadReq errBadRequest
err := mixpanelPost(ctx, body)
switch {
case err == nil:
return nil
case errors.As(err, &errBadReq):
infof("skipping request due to bad request: %v", errBadReq.body)
if err := saveBadRequest(ctx, errBadReq.body); err != nil {
panic(err)
}
return nil
case errors.Is(err, errRetry):
d := backoff(3*time.Second, 30*time.Second, tries, 1.3)
infof("server: retrying attempt #%d. Sleeping for %v", tries+1, d)
if err := intSleep(d); err != nil {
return fmt.Errorf("sleep: %w", err)
}
continue
default:
return err
}
}
return errors.New("exhausted retries")
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case body, ok := <-q:
if !ok {
return nil
}
if err := withRetry(body); err != nil {
return err
}
}
}
}
var (
errRetry = errors.New("server wants us to retry")
)
type errBadRequest struct {
error
body string
}
func mixpanelPost(ctx context.Context, body string) error {
req, err := http.NewRequestWithContext(
ctx,
http.MethodPost,
mpUrl,
strings.NewReader(body),
)
if err != nil {
return fmt.Errorf("http.NewRequest: %w", err)
}
req.SetBasicAuth(*user, *secret)
q := req.URL.Query()
q.Set("project_id", strconv.Itoa(*projectID))
q.Set("lowsla", "1")
req.URL.RawQuery = q.Encode()
req.Header.Set("Content-Type", "application/x-ndjson")
if !*noCompression {
req.Header.Set("Content-Encoding", "gzip")
}
start := time.Now()
resp, err := http.DefaultClient.Do(req)
lat := time.Since(start)
if err != nil {
return fmt.Errorf("http.Do: %w", err)
}
defer resp.Body.Close()
stats.IncrResponseCode(resp.StatusCode)
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return fmt.Errorf("io.Copy: %w", err)
}
switch resp.StatusCode {
case http.StatusOK:
dec := json.NewDecoder(&buf)
var serverResponse struct {
EventsImported int `json:"events_imported"`
}
if err := dec.Decode(&serverResponse); err != nil {
return fmt.Errorf("json.Decode: server response: %w", err)
}
infof("server: imported %d events in %v", serverResponse.EventsImported, lat)
stats.IncrSuccess(serverResponse.EventsImported, len(body), lat)
return nil
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable:
return errRetry
case http.StatusBadRequest:
return errBadRequest{nil, buf.String()}
default:
return fmt.Errorf("server: %s: body: %s", resp.Status, buf.String())
}
}
func backoff(baseDelay, maxDelay time.Duration, retries int, factor float64) time.Duration {
backoff, max := float64(baseDelay), float64(maxDelay)
backoff *= math.Pow(factor, float64(retries))
if backoff > max {
backoff = max
}
// backoff = backoff/2 + rand.Float64()*backoff/2
return time.Duration(backoff)
}
func infof(fmt string, args ...interface{}) {
if !*verbose {
return
}
log.Printf(fmt, args...)
}
func saveBadRequest(ctx context.Context, body string) error {
dir := "failed-requests"
if err := os.MkdirAll("failed-requests", 0755); err != nil {
return fmt.Errorf("os.MkdirAll: %w", err)
}
f, err := ioutil.TempFile(dir, "request.*.json")
if err != nil {
return fmt.Errorf("TempFile: %w", err)
}
defer f.Close()
if _, err := io.WriteString(f, body); err != nil {
return fmt.Errorf("WriteString: %w", err)
}
if err := f.Close(); err != nil {
return fmt.Errorf("file.Close: %w", err)
}
return nil
}
#!/bin/bash
set -euo pipefail
GCS_BUCKET_PREFIX=${1?"GCS_BUCKET_PREFIX is required"}
function process() {
local obj=$1
if [[ $obj != *.gz ]]; then
return
fi
echo "importing ${obj}"
local tmp_file=$(mktemp tmp.mp_import.XXX.gz)
echo "downloading ${obj} to ${tmp_file}"
gsutil cp "${obj}" "${tmp_file}"
local filters=()
# parse embedded JSON in the "payload" property if present. it
# sometimes has a "time" property that shouldn't be used.
filters+=('if (.payload != null) then (. | del(.payload)) + (.payload | fromjson | del(.time)) else . end')
# trim all strings
filters+=('.. |= (if type == "string" then .[0:200] else . end)')
# eventify the row
filters+=('{"event":.event_type, "time":(.time/1000), "insert_id": .insert_id[0:36], "distinct_id":.user_id_encid, "properties": (. | del(.event_type, .time, .insert_id))}')
# filter out rows that won't pass validation
filters+=('select(.event != null and .event != "" and .distinct_id != null and .distinct_id != "" and .insert_id != null and .insert_id != "")')
IFS=$'|' eval 'joined_filters="${filters[*]}"'
gunzip --stdout "${tmp_file}" | \
jq -cr "${joined_filters}" | \
go run . --project-id 793719 --user v2-api.86d66e.mp-service-account --secret 'VFg2RW7oC1bDlVzYNNPRpoWMvvcouHpN' --concurrency 8
# cat
rm -f "${tmp_file}"
}
export -f process
# gsutil times out if we read from its stdout slowly. so buffer the whole list
# in mem
obj_list=$(gsutil ls ${GCS_BUCKET_PREFIX})
xargs -I{} -P4 -n1 bash -c 'process "{}"' <<< "$obj_list"
package main
import (
"fmt"
"io"
"sync"
"text/tabwriter"
"time"
"github.com/rcrowley/go-metrics"
)
type statsPrinter struct {
out *tabwriter.Writer
latency metrics.Histogram
sync.Mutex
numEvents int
numRequests int
numBytes int
numStatusCodes map[int]int
lastTick time.Time
}
func newStats(out io.Writer) *statsPrinter {
stats := &statsPrinter{
out: tabwriter.NewWriter(out, 0, 4, 5, '\t', tabwriter.Debug),
numStatusCodes: make(map[int]int),
latency: metrics.NewHistogram(metrics.NewUniformSample(10)),
}
fmt.Fprintf(
stats.out,
"%-10s\t%-10s\t%-10s\t%-10s\t%-20s\t%-44s\t%-40s\n",
"Events",
"Requests",
"events/sec",
"KiB/sec",
"Response Codes",
"Latency Seconds (200 OKs)",
"Time",
)
return stats
}
func (stats *statsPrinter) Print(now time.Time) {
stats.Lock()
defer stats.Unlock()
if stats.numEvents == 0 {
return
}
hist := stats.latency.Snapshot()
fmt.Fprintf(
stats.out,
"%-10d\t%-10d\t%-8.02f\t%-8.02f\t%-20s\t%-44s\t%-40s\n",
stats.numEvents,
stats.numRequests,
float64(stats.numEvents)/now.Sub(stats.lastTick).Seconds(),
float64(stats.numBytes)/1024/now.Sub(stats.lastTick).Seconds(),
fmt.Sprintf("%v", stats.numStatusCodes),
fmt.Sprintf(
"[%.02f min, %.02f median, %.02f p99, %.02f max]",
time.Duration(hist.Min()).Seconds(),
time.Duration(hist.Percentile(0.5)).Seconds(),
time.Duration(hist.Percentile(0.99)).Seconds(),
time.Duration(hist.Max()).Seconds(),
),
now.Format(time.RFC3339),
)
_ = stats.out.Flush()
stats.numEvents = 0
stats.numBytes = 0
stats.numRequests = 0
stats.numStatusCodes = make(map[int]int)
stats.latency.Clear()
stats.lastTick = now
}
func (stats *statsPrinter) IncrResponseCode(code int) {
stats.Lock()
stats.numRequests++
stats.numStatusCodes[code]++
stats.Unlock()
}
func (stats *statsPrinter) IncrSuccess(events, bytes int, latency time.Duration) {
stats.Lock()
stats.numBytes += bytes
stats.numEvents += events
stats.latency.Update(int64(latency))
stats.Unlock()
}
package main
import (
"bytes"
"compress/gzip"
"fmt"
"io"
)
type writer struct {
io.Writer
buf *bytes.Buffer
}
func newWriter(skipCompression bool) *writer {
wr := &writer{
buf: &bytes.Buffer{},
}
wr.Writer = wr.buf
if !skipCompression {
wr.Writer = gzip.NewWriter(wr.buf)
}
return wr
}
func (wr *writer) Writeln(bs []byte) error {
if _, err := wr.Writer.Write(bs); err != nil {
return err
}
if _, err := wr.Writer.Write([]byte("\n")); err != nil {
return err
}
return nil
}
func (wr *writer) ShouldFlush(lim int) bool {
// leave some room for unflushed bytes in gzip writer
const padding = 128 << 10 // 128 KiB
return wr.buf.Len()+padding >= lim
}
func (wr *writer) Flush() (string, error) {
gz, ok := wr.Writer.(*gzip.Writer)
if ok {
if err := gz.Close(); err != nil {
return "", fmt.Errorf("gz.Close: %w", err)
}
}
// clones the data in buf
ret := wr.buf.String()
wr.buf.Reset()
if ok {
gz.Reset(wr.buf)
}
return ret, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment