Skip to content

Instantly share code, notes, and snippets.

@jjmalina
Last active December 30, 2015 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jjmalina/7789435 to your computer and use it in GitHub Desktop.
Save jjmalina/7789435 to your computer and use it in GitHub Desktop.
//
// insertion or deletion of arbitrary json logs to riak
// usage:
// cat logs | write-or-delete-logs --hosts=127.0.0.1,127.0.0.2 --port=8087 mybucket 2>> write_log &
// OR you can delete EVERYTHING
// write-or-delete-logs --delete --hosts=127.0.0.1,127.0.0.2 --port=8087 mybucket
package main
import (
"bufio"
"flag"
"github.com/mrb/riakpbc"
"io"
"log"
"os"
"strconv"
"strings"
)
type Log struct {
Id string
Payload string
}
func putToRiak(riak *riakpbc.Client, bucket string, data string) (string, error) {
var log Log
if jsonerr := json.Unmarshal([]byte(data), &log); jsonerr != nil {
return "", jsonerr
}
response, riakErr := riak.ReqResp(&riakpbc.RpbPutReq{
Bucket: []byte(bucket),
Content: &riakpbc.RpbContent{
Value: []byte(data),
ContentType: []byte("application/json"),
Indexes: []*riakpbc.RpbPair{
&riakpbc.RpbPair{
Key: []byte("payload_bin"),
Value: []byte(log.Payload),
},
},
},
}, "RpbPutReq", false)
if riakErr != nil {
return "", riakErr
} else {
putresp := response.(*riakpbc.RpbPutResp)
return string(putresp.Key), nil
}
}
func deleteKey(riak *riakpbc.Client, bucket string, key string) (string, error) {
if _, err := riak.DeleteObject(bucket, key); err != nil {
return key, err
}
return key, nil
}
type RiakWorker struct {
Id int
Client *riakpbc.Client
bucket string
Results chan *Work
workFunc func(*riakpbc.Client, string, string) (string, error)
}
func NewRiakWorker(id int, nodes []string, bucket string,
workFunc func(*riakpbc.Client, string, string) (string, error)) *RiakWorker {
client := riakpbc.NewClient(nodes)
if err := client.Dial(); err != nil {
panic(err.Error())
}
return &RiakWorker{
Id: id,
Client: client,
bucket: bucket,
Results: make(chan *Work),
workFunc: workFunc,
}
}
type Work struct {
Data string
Key string
Success bool
LineNumber int
}
func (w *RiakWorker) Run(jobs <-chan *Work) {
for work := range jobs {
w.process(work)
}
close(w.Results)
log.Println("Worker " + strconv.Itoa(w.Id) + " exiting.")
}
func (w *RiakWorker) process(job *Work) {
key, error := w.workFunc(w.Client, w.bucket, job.Data)
if error != nil {
job.Success = false
} else {
job.Success = true
job.Key = key
}
w.Results <- job
}
func stdinToChannel(input chan<- *Work) {
bio := bufio.NewReader(os.Stdin)
i := 0
for {
line, err := bio.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
log.Println(err)
}
i = i + 1
input <- &Work{Data: line, LineNumber: i}
}
close(input)
}
func listKeysIntoChannel(client *riakpbc.Client, bucket string) func(input chan<- *Work) {
return func(input chan<- *Work) {
keys, riakErr := client.ListKeys(bucket)
if riakErr != nil {
log.Println(riakErr.Error())
} else {
for i := range keys {
input <- &Work{Data: string(keys[i]), LineNumber: i}
}
}
close(input)
}
}
func listenOnResultsChannels(delete bool, worker *RiakWorker, done chan *RiakWorker) {
for job := range worker.Results {
var msg string
if job.Success {
if delete {
msg = "DELETED " + job.Key
} else {
msg = "ADDED " + job.Key
}
msg = msg + " from line " + strconv.Itoa(job.LineNumber)
} else {
msg = "ERROR from line " + strconv.Itoa(job.LineNumber)
}
log.Println(msg)
}
done <- worker
}
func main() {
var hostlist = flag.String("hosts", "127.0.0.1", "Comma delimited list of hosts")
var port = flag.Int("port", 8087, "Riak Protobuf Port")
var nworkers = flag.Int("workers", 3, "Number of workers")
var deleteMode = flag.Bool("delete", false, "Delete all keys in the bucket (useful for cleaning up)")
flag.Parse()
args := flag.Args()
if len(args) < 1 {
log.Print("Bucket missing")
os.Exit(1)
}
bucket := args[0]
hosts := strings.Split(*hostlist, ",")
var workFunc func(*riakpbc.Client, string, string) (string, error)
var inputFunc func(chan<- *Work)
var nodes = make([]string, len(*hostlist))
for i, host := range hosts {
nodes[i] = host + ":" + strconv.Itoa(*port)
}
if *deleteMode {
client := riakpbc.NewClient(nodes)
if err := client.Dial(); err != nil {
panic(err.Error())
}
workFunc = deleteKey
inputFunc = listKeysIntoChannel(client, bucket)
} else {
workFunc = putToRiak
inputFunc = stdinToChannel
}
jobs := make(chan *Work)
workers := make([]*RiakWorker, *nworkers)
var done = make(chan *RiakWorker)
// spawn workers
for i := range workers {
workers[i] = NewRiakWorker(i, nodes, bucket, workFunc)
go workers[i].Run(jobs)
go listenOnResultsChannels(*deleteMode, workers[i], done)
}
// start putting work into workers
inputFunc(jobs)
for i := 0; i < *nworkers; i++ {
<-done
}
log.Println("Exiting")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment