Skip to content

Instantly share code, notes, and snippets.

@daaku

daaku/main.go Secret

Created March 22, 2021 06:16
Show Gist options
  • Save daaku/1b2c4cfddf6b9ba35586aaaec2d3200f to your computer and use it in GitHub Desktop.
Save daaku/1b2c4cfddf6b9ba35586aaaec2d3200f to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"encoding/xml"
"fmt"
"io"
"log"
"os"
"os/exec"
"runtime"
"sync"
"github.com/pkg/errors"
)
const dumpFile = "/home/naitik/Downloads/enwiki-20210101-pages-articles-multistream.xml.bz2"
const chunkSize = 10 * 1024 * 1024 // 10MB
const initialWindow = 1024 * 1024 // 1MB
const maxChunks = 1000
var bufPool = &sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
var numWorkers = runtime.NumCPU()
var startPageTag = []byte("<page>")
var endPageTag = []byte("</page>")
type Page struct {
ID uint64 `xml:"id"`
Title string `xml:"title"`
}
func consume(r io.Reader) error {
xmlDecoder := xml.NewDecoder(r)
var p Page
for {
t, err := xmlDecoder.Token()
if err != nil {
if err == io.EOF {
return nil
}
return errors.WithStack(err)
}
if se, ok := t.(xml.StartElement); ok && se.Name.Local == "page" {
if err := xmlDecoder.DecodeElement(&p, &se); err != nil {
return errors.WithStack(err)
}
fmt.Println(p)
}
}
}
func worker(wg *sync.WaitGroup, work chan *bytes.Buffer) {
defer wg.Done()
for buf := range work {
if err := consume(buf); err != nil {
log.Printf("%+v\n", err)
}
buf.Reset()
bufPool.Put(buf)
}
}
func run() error {
cmd := exec.Command("pbzcat", dumpFile)
cmd.Stderr = os.Stderr
bzr, err := cmd.StdoutPipe()
if err != nil {
return errors.WithStack(err)
}
defer bzr.Close()
if err := cmd.Start(); err != nil {
return err
}
defer cmd.Process.Kill()
work := make(chan *bytes.Buffer)
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go worker(&wg, work)
}
leftover := make([]byte, initialWindow)
n, err := io.ReadAtLeast(bzr, leftover, initialWindow)
if err != nil {
return errors.WithStack(err)
}
start := bytes.Index(leftover, startPageTag)
if start == -1 {
return errors.New("too small initial size?")
}
leftover = leftover[start:n]
total := int64(n)
chunks := 0
defer func() {
log.Println("bytes read", total)
log.Println("chunks processed", chunks)
}()
for {
buf := bufPool.Get().(*bytes.Buffer)
buf.Grow(chunkSize)
buf.Write(leftover)
pending := chunkSize - int64(len(leftover))
leftover = leftover[0:0]
n, err := io.CopyN(buf, bzr, pending)
if err != nil {
log.Println("on error copied", n)
log.Println("on error pending", pending)
return errors.WithStack(err)
}
total += n
contents := buf.Bytes()
index := bytes.LastIndex(contents, endPageTag)
if index == -1 {
return errors.New("chunk size too small?")
}
index += len(endPageTag)
leftover = append(leftover, contents[index:]...)
leftover = leftover[:buf.Len()-index]
buf.Truncate(index)
work <- buf
chunks += 1
if chunks == maxChunks {
break
}
}
close(work)
wg.Wait()
return nil
}
func main() {
log.SetOutput(os.Stderr)
log.SetPrefix(">> ")
log.SetFlags(0)
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%+v\n", err)
os.Exit(1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment