Skip to content

Instantly share code, notes, and snippets.

@ian-plosker
Created November 18, 2014 13:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ian-plosker/04705a1f8e13fe46b547 to your computer and use it in GitHub Desktop.
Save ian-plosker/04705a1f8e13fe46b547 to your computer and use it in GitHub Desktop.
orcimport
package main
import (
"bufio"
"encoding/json"
"flag"
"github.com/orchestrate-io/gorc"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
)
var (
apiKey = flag.String("key", "00000000-0000-0000-0000-000000000000", "the api key")
retries = flag.Int("retries", 3, "the number retry attempts")
workerCount = flag.Int("workers", 20, "the number of worker procs")
primaryKey = flag.String("primaryKey", "_id", "the primary key")
host = flag.String("host", "api.orchestrate.io", "the Orchestrate API host to use")
wg sync.WaitGroup
reqs = make(chan Request, 100)
c *gorc.Client
)
type Request struct {
collection string
value string
respChan chan Response
}
type Response struct {
bytes int
err error
}
func main() {
flag.Parse()
c = gorc.NewClient(*apiKey)
c.APIHost = *host
startRequestHandlerPool()
for _, file := range flag.Args() {
wg.Add(1)
go func(file string) {
importFile(file)
}(file)
}
wg.Wait()
close(reqs)
}
func startRequestHandlerPool() {
for i := 0; i < *workerCount; i++ {
go handleRequests(reqs)
}
}
func importFile(filename string) {
file, err := os.Open(filename)
if err != nil {
log.Printf("Error: %v\n", err)
return
}
defer file.Close()
collection := filepath.Base(filename)
stats, _ := file.Stat()
fileSize := stats.Size()
log.Printf("Importing %v to %v ...", filename, collection)
reader := bufio.NewReaderSize(file, 1024*1024)
var resps = make(chan Response, 100)
go handleResponses(filename, fileSize, resps)
for err == nil {
var line string
line, err = reader.ReadString('\n')
reqs <- Request{collection, line, resps}
}
if err != nil && err != io.EOF {
log.Panicf("Scanner error: %v\n", err)
}
}
func handleRequests(reqs chan Request) {
for req := range reqs {
decoder := json.NewDecoder(strings.NewReader(req.value))
json := new(map[string]interface{})
err := decoder.Decode(json)
if err == nil {
var key string
if *primaryKey == "_id" {
key = (*json)["_id"].(map[string]interface{})["$oid"].(string)
} else {
key = (*json)[*primaryKey].(string)
}
for try := 1; try <= *retries; try++ {
if _, err = c.Put(req.collection, key, json); err != nil {
log.Printf("Error %v/%v (attempt %v of %v): %v\n", req.collection, key, try, *retries, err)
continue
}
if try > 1 {
log.Printf("%v/%v successful after %v attempts", req.collection, key, try)
}
break
}
}
req.respChan <- Response{len(req.value), err}
}
}
func handleResponses(filename string, fileSize int64, resps chan Response) {
var importCount, errorCount int
var bytesImported int64
for resp := range resps {
switch err := resp.err.(type) {
case gorc.OrchestrateError:
errorCount++
log.Printf("Error: %v", err.Error())
}
importCount++
bytesImported += int64(resp.bytes)
if importCount%1000 == 0 {
percentComplete := float64(bytesImported) / float64(fileSize) * 100
log.Printf("Progress imported %v items from %v (%.3f%% complete)", importCount, filename, percentComplete)
}
if bytesImported == fileSize {
close(resps)
}
}
log.Printf("Done importing %v items from %v (with %v errors)", importCount, filename, errorCount)
wg.Done()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment