Skip to content

Instantly share code, notes, and snippets.

@nojima
Last active August 29, 2015 13:56
Show Gist options
  • Save nojima/8817880 to your computer and use it in GitHub Desktop.
Save nojima/8817880 to your computer and use it in GitHub Desktop.
package main
import "bufio"
import "flag"
import "fmt"
import "log"
import "os"
import "strings"
import "sync"
import "github.com/tpjg/goriakpbc"
type Options struct {
Addr string
PoolSize int
ContentType string
}
func insertItem(bucket *riak.Bucket, key string, value string, contentType string,
waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
obj := bucket.NewObject(key)
obj.ContentType = contentType
obj.Indexes["key_bin"] = key
obj.Data = []byte(value)
obj.Store()
}
func bulkInsert(bucketName string, file *os.File, options *Options) {
if err := riak.ConnectClientPool(options.Addr, options.PoolSize); err != nil {
log.Fatal(err)
}
bucket, err := riak.NewBucket(bucketName)
if err != nil {
log.Fatal(err)
}
var waitGroup sync.WaitGroup
scanner := bufio.NewScanner(file)
for i := 1; scanner.Scan(); i += 1 {
row := strings.SplitN(scanner.Text(), "\t", 2)
if len(row) != 2 {
log.Fatal("invalid format at line %v.", i)
}
waitGroup.Add(1)
go insertItem(bucket, row[0], row[1], options.ContentType, &waitGroup)
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
waitGroup.Wait()
}
func printUsage() {
fmt.Fprintf(os.Stderr, "Usage: riak-bulk-insert [options] BUCKET FILE\n\n")
fmt.Fprintf(os.Stderr, "Arguments:\n" +
" BUCKET the name of the bucket where objects inserted.\n" +
" FILE the name of the file containing a tab-separated\n" +
" (key, value) pair for each line.\n\n")
fmt.Fprintf(os.Stderr, "Options:\n")
flag.VisitAll(func(f *flag.Flag) {
fmt.Fprintf(os.Stderr, " -%-11v %v default: %v\n", f.Name, f.Usage, f.DefValue)
})
}
func main() {
options := new(Options)
flag.StringVar(&options.Addr, "addr", "localhost:8087",
"the hostname and port number of the riak server.")
flag.IntVar(&options.PoolSize, "pool", 10,
"the size of the connection pool")
flag.StringVar(&options.ContentType, "content-type", "text/plain",
"the content type of the inserted objects")
flag.Parse()
if flag.NArg() != 2 {
printUsage();
os.Exit(1)
}
bucket := flag.Arg(0)
filename := flag.Arg(1)
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
bulkInsert(bucket, file, options)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment