-
-
Save daaku/1b2c4cfddf6b9ba35586aaaec2d3200f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"encoding/xml" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"os/exec" | |
"runtime" | |
"sync" | |
"github.com/pkg/errors" | |
) | |
const dumpFile = "/home/naitik/Downloads/enwiki-20210101-pages-articles-multistream.xml.bz2" | |
const chunkSize = 10 * 1024 * 1024 // 10MB | |
const initialWindow = 1024 * 1024 // 1MB | |
const maxChunks = 1000 | |
var bufPool = &sync.Pool{ | |
New: func() interface{} { | |
return &bytes.Buffer{} | |
}, | |
} | |
var numWorkers = runtime.NumCPU() | |
var startPageTag = []byte("<page>") | |
var endPageTag = []byte("</page>") | |
type Page struct { | |
ID uint64 `xml:"id"` | |
Title string `xml:"title"` | |
} | |
func consume(r io.Reader) error { | |
xmlDecoder := xml.NewDecoder(r) | |
var p Page | |
for { | |
t, err := xmlDecoder.Token() | |
if err != nil { | |
if err == io.EOF { | |
return nil | |
} | |
return errors.WithStack(err) | |
} | |
if se, ok := t.(xml.StartElement); ok && se.Name.Local == "page" { | |
if err := xmlDecoder.DecodeElement(&p, &se); err != nil { | |
return errors.WithStack(err) | |
} | |
fmt.Println(p) | |
} | |
} | |
} | |
func worker(wg *sync.WaitGroup, work chan *bytes.Buffer) { | |
defer wg.Done() | |
for buf := range work { | |
if err := consume(buf); err != nil { | |
log.Printf("%+v\n", err) | |
} | |
buf.Reset() | |
bufPool.Put(buf) | |
} | |
} | |
func run() error { | |
cmd := exec.Command("pbzcat", dumpFile) | |
cmd.Stderr = os.Stderr | |
bzr, err := cmd.StdoutPipe() | |
if err != nil { | |
return errors.WithStack(err) | |
} | |
defer bzr.Close() | |
if err := cmd.Start(); err != nil { | |
return err | |
} | |
defer cmd.Process.Kill() | |
work := make(chan *bytes.Buffer) | |
var wg sync.WaitGroup | |
wg.Add(numWorkers) | |
for i := 0; i < numWorkers; i++ { | |
go worker(&wg, work) | |
} | |
leftover := make([]byte, initialWindow) | |
n, err := io.ReadAtLeast(bzr, leftover, initialWindow) | |
if err != nil { | |
return errors.WithStack(err) | |
} | |
start := bytes.Index(leftover, startPageTag) | |
if start == -1 { | |
return errors.New("too small initial size?") | |
} | |
leftover = leftover[start:n] | |
total := int64(n) | |
chunks := 0 | |
defer func() { | |
log.Println("bytes read", total) | |
log.Println("chunks processed", chunks) | |
}() | |
for { | |
buf := bufPool.Get().(*bytes.Buffer) | |
buf.Grow(chunkSize) | |
buf.Write(leftover) | |
pending := chunkSize - int64(len(leftover)) | |
leftover = leftover[0:0] | |
n, err := io.CopyN(buf, bzr, pending) | |
if err != nil { | |
log.Println("on error copied", n) | |
log.Println("on error pending", pending) | |
return errors.WithStack(err) | |
} | |
total += n | |
contents := buf.Bytes() | |
index := bytes.LastIndex(contents, endPageTag) | |
if index == -1 { | |
return errors.New("chunk size too small?") | |
} | |
index += len(endPageTag) | |
leftover = append(leftover, contents[index:]...) | |
leftover = leftover[:buf.Len()-index] | |
buf.Truncate(index) | |
work <- buf | |
chunks += 1 | |
if chunks == maxChunks { | |
break | |
} | |
} | |
close(work) | |
wg.Wait() | |
return nil | |
} | |
func main() { | |
log.SetOutput(os.Stderr) | |
log.SetPrefix(">> ") | |
log.SetFlags(0) | |
if err := run(); err != nil { | |
fmt.Fprintf(os.Stderr, "%+v\n", err) | |
os.Exit(1) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment