Skip to content

Instantly share code, notes, and snippets.

@jjmalina
Created November 11, 2013 17:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jjmalina/7416807 to your computer and use it in GitHub Desktop.
Save jjmalina/7416807 to your computer and use it in GitHub Desktop.
//
// insertion of logs into riak
// usage:
// cat logs | log-processing --host=127.0.0.1 --port=10017 mybucket
package main
import (
"bufio"
"encoding/json"
"flag"
"github.com/mrb/riakpbc"
"io"
"log"
"os"
"strconv"
)
type LogData struct {
Timestamp float64
Code string
Csid string
}
func putToRiak(riak *riakpbc.Client, bucket string, data string) (string, error) {
var event LogData
if jsonerr := json.Unmarshal([]byte(data), &event); jsonerr != nil {
return "", jsonerr
}
response, riakErr := riak.ReqResp(&riakpbc.RpbPutReq{
Bucket: []byte(bucket),
Content: &riakpbc.RpbContent{
Value: []byte(data),
ContentType: []byte("application/json"),
Indexes: []*riakpbc.RpbPair{
&riakpbc.RpbPair{
Key: []byte("timestamp_int"),
Value: []byte(strconv.FormatFloat(event.Timestamp, 'f', 0, 64)),
},
&riakpbc.RpbPair{
Key: []byte("code_bin"),
Value: []byte(event.Code),
},
&riakpbc.RpbPair{
Key: []byte("csid_bin"),
Value: []byte(event.Csid),
},
},
},
}, "RpbPutReq", false)
if riakErr != nil {
return "", riakErr
} else {
putresp := response.(*riakpbc.RpbPutResp)
key := string(putresp.Key)
return key, nil
}
}
func deleteKey(riak *riakpbc.Client, bucket string, key string) (string, error) {
if _, err := riak.DeleteObject(bucket, key); err != nil {
return key, err
}
return key, nil
}
type RiakWorker struct {
Id int
Client *riakpbc.Client
bucket string
Results chan string
workFunc func(*riakpbc.Client, string, string) (string, error)
}
func NewRiakWorker(id int, host *string, port *int, bucket string,
workFunc func(*riakpbc.Client, string, string) (string, error)) *RiakWorker {
client := riakpbc.NewClient([]string{*host + ":" + strconv.Itoa(*port)})
if err := client.Dial(); err != nil {
panic(err.Error())
}
return &RiakWorker{
Id: id,
Client: client,
bucket: bucket,
Results: make(chan string),
workFunc: workFunc,
}
}
type Work struct {
Data string
LineNumber int
}
func (w *RiakWorker) Run(jobs <-chan *Work) {
for work := range jobs {
w.process(work)
}
close(w.Results)
log.Println("Worker " + strconv.Itoa(w.Id) + " exiting.")
}
func (w *RiakWorker) process(job *Work) {
key, error := w.workFunc(w.Client, w.bucket, job.Data)
if error == nil {
w.Results <- key + " from line " + strconv.Itoa(job.LineNumber)
} else {
w.Results <- "error from line " + strconv.Itoa(job.LineNumber)
}
}
func stdinToChannel(input chan<- *Work) {
bio := bufio.NewReader(os.Stdin)
i := 0
for {
line, err := bio.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
log.Println(err)
}
i = i + 1
input <- &Work{Data: line, LineNumber: i}
}
close(input)
}
func listKeysIntoChannel(client *riakpbc.Client, bucket string) func(input chan<- *Work) {
return func(input chan<- *Work) {
keys, riakErr := client.ListKeys(bucket)
if riakErr != nil {
log.Println(riakErr.Error())
} else {
for i := range keys {
input <- &Work{Data: string(keys[i]), LineNumber: i}
}
}
close(input)
}
}
func listenOnResultsChannels(delete bool, worker *RiakWorker, done chan *RiakWorker) {
for result := range worker.Results {
var msg string
if delete {
msg = "DELETED " + result
} else {
msg = "ADDED " + result
}
log.Println(msg)
}
done <- worker
}
func main() {
var host = flag.String("host", "127.0.0.1", "Riak Host")
var port = flag.Int("port", 10017, "Riak Protobuf Port")
var deleteMode = flag.Bool("deleteAll", false, "Delete all keys in the bucket (useful for cleaning up)")
flag.Parse()
args := flag.Args()
if len(args) < 1 {
log.Print("Bucket missing")
os.Exit(1)
}
bucket := args[0]
var workFunc func(*riakpbc.Client, string, string) (string, error)
var inputFunc func(chan<- *Work)
if *deleteMode {
client := riakpbc.NewClient([]string{*host + ":" + strconv.Itoa(*port)})
if err := client.Dial(); err != nil {
panic(err.Error())
}
workFunc = deleteKey
inputFunc = listKeysIntoChannel(client, bucket)
} else {
workFunc = putToRiak
inputFunc = stdinToChannel
}
jobs := make(chan *Work)
numWorkers := 3
workers := make([]*RiakWorker, numWorkers)
// listen for results from workers
var done = make(chan *RiakWorker)
// spawn workers
for i := range workers {
workers[i] = NewRiakWorker(i, host, port, bucket, workFunc)
go workers[i].Run(jobs)
go listenOnResultsChannels(*deleteMode, workers[i], done)
}
// start putting work into workers
inputFunc(jobs)
for i:=0; i<numWorkers; i++ {
<-done
}
// done
log.Println("Exiting")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment