Skip to content

Instantly share code, notes, and snippets.

@imjasonh
Last active October 28, 2021 06:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save imjasonh/5bb8ed64029325d826d3 to your computer and use it in GitHub Desktop.
Save imjasonh/5bb8ed64029325d826d3 to your computer and use it in GitHub Desktop.
Script to ingest wiki dumps into CSV, eventually into BigQuery
package app
import (
"context"
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
"google.golang.org/appengine"
"google.golang.org/appengine/urlfetch"
)
const (
projectID = "bqwiki-123"
zone = "us-east1-d"
machineType = "n1-highmem-2"
)
func init() {
http.HandleFunc("/run", run)
}
func run(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(appengine.NewContext(r), time.Hour)
defer cancel()
svc, err := compute.New(&http.Client{
Transport: tokTransport{ctx},
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err := svc.Instances.Insert(appengine.AppID(ctx), zone, &compute.Instance{
Name: "vm",
MachineType: machineType,
Metadata: &compute.Metadata{
Items: []*MetadataItems{{
// TODO: specify startup script
}},
},
ServiceAccounts: []*compute.ServiceAccount{{
Email: "180707267020-compute@developer.gserviceaccount.com",
Scopes: []string{compute.CloudPlatformScope},
}},
}).Do(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
type tokTransport struct {
ctx context.Context
}
func (t tokTransport) RoundTrip(r *http.Request) (*http.Response, error) {
tok, _, err := appengine.AccessToken(ctx)
if err != nil {
return nil, err
}
r.Header.Set("Authorization", "Bearer "+tok)
client := urlfetch.Client(ctx)
return client.Do(r)
}
runtime: go
api_version: go1
handlers:
- url: /.*
script: _go_app
login: admin
package main
import (
"compress/gzip"
"encoding/csv"
"encoding/xml"
"flag"
"fmt"
"io"
"log"
"os"
"strings"
"time"
)
type revision struct {
RevisionID string `xml:"id"`
ParentID string `xml:"parentid"`
Comment string `xml:"comment"`
ContributorIP string `xml:"contributor>ip"`
ContributorID string `xml:"contributor>id"`
ContributorUsername string `xml:"contributor>username"`
TimestampString string `xml:"timestamp"`
Text struct {
Bytes int `xml:"bytes,attr"`
} `xml:"text"`
Minor *struct{} `xml:"minor"`
}
var (
logEvery = flag.Int("log_every", 1000000, "Log every N CSV rows")
v = flag.Bool("v", false, "verbose logging")
)
func main() {
start := time.Now()
flag.Parse()
csvw := csv.NewWriter(os.Stdout)
csvw.Write([]string{
"namespace",
"title",
"id",
"revision_id",
"parent_id",
"comment",
"contributor_ip",
"contributor_id",
"contributor_username",
"timestamp",
"num_characters",
"is_minor",
})
var r io.Reader
var err error
r, err = gzip.NewReader(os.Stdin)
if err != nil {
log.Fatalf("gzip.NewReader: %v", err)
}
if *v {
r = io.TeeReader(r, os.Stderr)
}
d := xml.NewDecoder(r)
count := 0
var ns, id, title string
for {
t, err := d.Token()
if err == io.EOF {
break
} else if err != nil {
log.Fatal(err)
}
if s, ok := t.(xml.StartElement); ok {
switch s.Name.Local {
case "page":
if *v {
log.Println("=== entering new page ===")
}
case "ns":
ns = ReadString(d)
switch ns {
case "-2":
ns = "202"
case "-1":
ns = "201"
}
case "id":
id = ReadString(d)
case "title":
title = ReadString(d)
case "revision":
var rev revision
if err := d.DecodeElement(&rev, &s); err != nil {
log.Fatal(err)
}
tss := ""
if rev.TimestampString != "" {
ts, err := time.Parse(time.RFC3339, rev.TimestampString)
if err != nil {
log.Println(rev.TimestampString)
log.Fatal(err)
}
tss = fmt.Sprintf("%d", ts.Unix())
}
csvw.Write([]string{
ns,
title,
id,
rev.RevisionID,
rev.ParentID,
strings.Replace(rev.Comment, "\n", "\\n", -1),
rev.ContributorIP,
rev.ContributorID,
rev.ContributorUsername,
tss,
fmt.Sprintf("%d", rev.Text.Bytes),
fmt.Sprintf("%t", rev.Minor != nil),
})
count++
if count%*logEvery == 0 {
log.Println("Wrote %d rows", *logEvery)
csvw.Flush()
if err := csvw.Error(); err != nil {
log.Fatal(err)
}
}
}
}
}
csvw.Flush()
if err := csvw.Error(); err != nil {
log.Fatal(err)
}
log.Printf(" %d rows\n", count)
log.Printf(" took %s\n", time.Since(start))
}
// Reads the string content of the current element
func ReadString(d *xml.Decoder) string {
t, err := d.Token()
if err != nil {
log.Fatal(err)
}
if cd, ok := t.(xml.CharData); ok {
return string(cd)
}
log.Println("ERROR: unexpected tag, isn't CharData")
return "<IMPORT ERROR>"
}
cron:
- description: "update table"
url: /run
schedule: 2,21 every month
#!/bin/bash
set -e
set -x
project=bqwiki-123
vmname=vm
zone=us-east1-d
vmtype=n1-highmem-2
gcloud --project=$project compute instances create $vmname \
--zone=$zone \
--machine-type=$vmtype \
--boot-disk-size=10 \
--boot-disk-type=pd-standard \
--boot-disk-device-name=$vmname \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--metadata-from-file=startup-script=startup.sh
[
{"name":"namespace", "type":"integer", "mode":"required"},
{"name":"title", "type":"string", "mode":"required"},
{"name":"id", "type":"integer", "mode":"required"},
{"name":"revision_id", "type":"integer", "mode":"required"},
{"name":"parent_id", "type":"integer", "mode":"nullable"},
{"name":"comment", "type":"string", "mode":"nullable"},
{"name":"contributor_ip", "type":"string", "mode":"nullable"},
{"name":"contributor_id", "type":"integer", "mode":"nullable"},
{"name":"contributor_username", "type":"string", "mode":"nullable"},
{"name":"timestamp", "type":"timestamp", "mode":"required"},
{"name":"num_characters", "type":"integer", "mode":"required"},
{"name":"is_minor", "type":"boolean", "mode":"required"}
]
#!/bin/bash
set -e
set -x
ZONE=$(curl -H Metadata-Flavor:Google http://metadata/computeMetadata/v1/instance/zone | cut -d/ -f4)
NAME=$(curl http://metadata.google.internal/computeMetadata/v1/instance/name)
# Install Stackdriver Logging agent
curl -sSO https://dl.google.com/cloudagents/install-logging-agent.sh
sudo bash install-logging-agent.sh
rm install-logging-agent.sh
# Install dependencies.
sudo apt-get update || exit
sudo apt install -y golang-go git-all || exit
# Install script.
git clone https://gist.github.com/5bb8ed64029325d826d3.git bqwiki
cd bqwiki
# Fetch XML, convert to CSV, write to GCS.
time curl https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-stub-meta-history.xml.gz | go run bqwiki.go | gsutil cp - gs://bqwiki/table.csv
# Replace table with new table.
echo y | bq rm wikipedia.latest || true
time bq load --skip_leading_rows=1 wikipedia.latest gs://bqwiki/table.csv schema.json
# Kill myself.
gcloud --quiet compute instances delete $NAME --zone=$ZONE
@imjasonh
Copy link
Author

$ time go run bqwiki.go

real 218m8.797s
user 206m14.760s
sys 3m25.090s

$ wc -l out.csv
33363242 out.csv

$ ls -lh out.csv | cut -d' ' -f 5
3.1G

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment