Skip to content

Instantly share code, notes, and snippets.

@miku
Last active December 8, 2018 03:43
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save miku/47950c4abd90319ec85d to your computer and use it in GitHub Desktop.
Save miku/47950c4abd90319ec85d to your computer and use it in GitHub Desktop.
Golang XML worker queue example

Parallel XML processing

When working with large XML files, only a streaming approach is feasible, which also helps to work on the data in parallel, since only one record needs to be cosidered at a time.

First version of wikikit parsed the XML element in the main routine and the fan out the parsed struct to the workers. Thanks to pprof, it got clear, why this did not yield major speed improvements - most time was spent in converting a XML token into a struct.

Single routine, tokenization only

Needs about 6.268s to decode and parse 10000 XML elements.

Skipping struct parsing:

decoder := xml.NewDecoder(handle)
var inElement string

for {
	t, _ := decoder.Token()
	if t == nil {
		break
	}
	switch se := t.(type) {
	case xml.StartElement:
		inElement = se.Name.Local
		if inElement == "page" {
			// ...
		}
	default:
	}
}

gets the speed down to 0.433s. Just tokenizing through a 34G XML file with a single routine takes 39m21.517s saturating a single core.

Time distribution looks like this: https://cdn.mediacru.sh/DYyiKLBA7U2i.svg

Single routine, struct loading

decoder := xml.NewDecoder(handle)
var inElement string
var p Page

for {
	// Read tokens from the XML document in a stream.
	t, _ := decoder.Token()
	if t == nil {
		break
	}
	// Inspect the type of the token just read.
	switch se := t.(type) {
	case xml.StartElement:
		// If we just read a StartElement token
		inElement = se.Name.Local
		// ...and its name is "page"
		if inElement == "page" {
			// decode a whole chunk of following XML into the
			// variable p which is a Page (se above)
			decoder.DecodeElement(&p, &se)
		}
	default:
	}
}

Observation

Most time is spent actually getting the token out. DecodeElement is not unsignificant. Grepping for </page> in 34G takes finds 15671705 occurences and takes about 16m35.992s.

Distributing decoder.Token() among workers seems not that easy. Workaround could be to

  • split the file into N pieces, while preserving the XML structure,
  • write the N pieces to N files,
  • convert each file in a separate routine,
  • concatenate the result into a single file.
#!/usr/bin/env bash
go tool pprof --web xmlp $1
all:
go fmt xmlp.go
go build xmlp.go
clean:
rm -f xmlp
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
// example of parallel XML stream processing, using wikidata XML dump
package main
import (
"encoding/xml"
"flag"
"fmt"
"os"
"runtime/pprof"
)
type Redirect struct {
Title string `xml:"title,attr" json:"title"`
}
// A page as it occurs on Wikipedia
type Page struct {
Title string `xml:"title" json:"title"`
CanonicalTitle string `xml:"ctitle" json:"ctitle"`
Redir Redirect `xml:"redirect" json:"redirect"`
Text string `xml:"revision>text" json:"text"`
}
func main() {
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file")
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s WIKDATA-XML-FILE\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Find dumps under http://dumps.wikimedia.org/wikidatawiki/\n")
flag.PrintDefaults()
}
if flag.NArg() < 1 {
flag.Usage()
os.Exit(1)
}
filename := flag.Args()[0]
handle, err := os.Open(filename)
if err != nil {
fmt.Println(err)
return
}
defer handle.Close()
decoder := xml.NewDecoder(handle)
var inElement string
var p Page
for {
// Read tokens from the XML document in a stream.
t, _ := decoder.Token()
if t == nil {
break
}
// Inspect the type of the token just read.
switch se := t.(type) {
case xml.StartElement:
// If we just read a StartElement token
inElement = se.Name.Local
// ...and its name is "page"
if inElement == "page" {
// decode a whole chunk of following XML into the
// variable p which is a Page (se above)
decoder.DecodeElement(&p, &se)
}
default:
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment