Skip to content

Instantly share code, notes, and snippets.

@nemosupremo
Created August 27, 2014 21:17
Show Gist options
  • Save nemosupremo/defa2448b1001367e565 to your computer and use it in GitHub Desktop.
Save nemosupremo/defa2448b1001367e565 to your computer and use it in GitHub Desktop.
package anna
import (
"fmt"
"github.com/channelmeter/gocql"
"github.com/channelmeter/heka/message"
. "github.com/channelmeter/heka/pipeline"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
)
type PartitionStrategy int
const (
PartitionStrategyShuffle PartitionStrategy = iota
PartitionStrategyGroupBy
PackIdLength = 16
)
// Output plugin that sends messages to Cassandra
type CassandraOutput struct {
cass *gocql.Session
name string
conf *CassandraOutputConfig
batchType gocql.BatchType
or OutputRunner
reportLock sync.Mutex
processMessageCount int64
}
// ConfigStruct for CassandraOutputConfig plugin.
type CassandraOutputConfig struct {
Hosts []string `toml:"hosts"`
Keyspace string `toml:"keyspace"`
Consistency string `toml:"consistency"`
ConnTimeout int `toml:"connection_timeout_ms"`
BatchSize int `toml:"max_batch_size"`
TickerInterval uint `toml:"ticker_interval"`
BatchType string `toml:"batch_type"`
Debug bool `toml:"debug"`
}
func ParseConsistencyString(consistency string) (gocql.Consistency, error) {
switch strings.ToLower(consistency) {
case "any":
return gocql.Any, nil
case "one":
return gocql.One, nil
case "two":
return gocql.Two, nil
case "three":
return gocql.Three, nil
case "quorum":
return gocql.Quorum, nil
case "all":
return gocql.All, nil
case "localquorum":
return gocql.LocalQuorum, nil
case "eachquorum":
return gocql.EachQuorum, nil
case "serial":
return gocql.Serial, nil
case "localserial":
return gocql.LocalSerial, nil
default:
return gocql.One, fmt.Errorf("Unrecognized consistency \"%s\",", consistency)
}
}
func (co *CassandraOutput) ConfigStruct() interface{} {
return &CassandraOutputConfig{
Hosts: []string{"localhost:9042"},
Keyspace: "heka",
ConnTimeout: 600,
Consistency: "one",
BatchSize: 4096,
BatchType: "unlogged",
TickerInterval: uint(600),
Debug: false,
}
}
func (co *CassandraOutput) SetName(name string) {
re := regexp.MustCompile("\\W")
co.name = re.ReplaceAllString(name, "_")
}
func (co *CassandraOutput) Init(config interface{}) error {
co.conf = config.(*CassandraOutputConfig)
cluster := gocql.NewCluster(co.conf.Hosts...)
cluster.ProtoVersion = 2
cluster.CQLVersion = "3.0.0"
if cons, err := ParseConsistencyString(co.conf.Consistency); err == nil {
cluster.Consistency = cons
} else {
return err
}
switch strings.ToLower(co.conf.BatchType) {
case "unlogged":
co.batchType = gocql.UnloggedBatch
case "logged":
co.batchType = gocql.LoggedBatch
case "counter":
co.batchType = gocql.CounterBatch
default:
return fmt.Errorf("Unrecognized batch type \"%s\",", co.conf.BatchType)
}
cluster.Timeout = time.Duration(co.conf.ConnTimeout) * time.Millisecond
cluster.Keyspace = co.conf.Keyspace
if session, err := cluster.CreateSession(); err == nil {
co.cass = session
} else {
return err
}
return nil
}
func makeStringArray(data ...interface{}) string {
out := "["
for _, d := range data {
out += fmt.Sprintf("\"%v\",", d)
}
return out[:len(out)-1] + "]"
}
func (co *CassandraOutput) Run(or OutputRunner, h PluginHelper) (err error) {
var (
ok = true
pack *PipelinePack
inChan = or.InChan()
ticker = or.Ticker()
outputExit = make(chan error)
outputError = make(chan error, 5)
)
co.or = or
currentBatch := co.cass.NewBatch(co.batchType)
flushBatch := func() {
if err := co.cass.ExecuteBatch(currentBatch); err == nil {
} else {
be := fmt.Sprintf("Failed to execute batch Error: %s.\n", err.Error())
for _, entry := range currentBatch.Entries {
be += fmt.Sprintf("\t\"%s\" - %v\n", entry.Stmt, makeStringArray(entry.Args...))
}
or.LogError(fmt.Errorf(be))
}
currentBatch = co.cass.NewBatch(co.batchType)
}
if co.conf.Debug {
flushBatch = func() {
be := fmt.Sprintf("Flushing Batch:\n")
for _, entry := range currentBatch.Entries {
be += fmt.Sprintf("\t\"%s\" - %v\n", entry.Stmt, makeStringArray(entry.Args...))
}
or.LogMessage(be)
currentBatch = co.cass.NewBatch(co.batchType)
}
}
for ok {
select {
case e := <-outputError:
or.LogError(e)
case pack, ok = <-inChan:
if !ok {
break
}
query := pack.Message.GetPayload()
fields := pack.Message.FindAllFields("cassandra.values")
values := make([]interface{}, len(fields))
for i, field := range fields {
switch field.GetRepresentation() {
case "time.Time":
if bbs := field.GetValueBytes(); len(bbs) > 0 {
var t time.Time
if err := t.UnmarshalBinary(bbs[0]); err == nil {
values[i] = t
} else {
values[i] = 0
}
} else {
values[i] = 0
}
default:
values[i] = field.GetValue()
}
}
if co.conf.BatchSize > 0 && co.conf.TickerInterval > 0 {
if co.conf.Debug {
or.LogMessage(fmt.Sprintf("Batching \"%s\" with values: %v", query, makeStringArray(values...)))
} else {
currentBatch.Query(query, values...)
}
} else {
if err := co.cass.Query(query, values...).Exec(); err != nil {
or.LogError(fmt.Errorf("Failed to execute query \"%s\" with values: %v. Error: %s", query, makeStringArray(values...), err.Error()))
}
}
atomic.AddInt64(&co.processMessageCount, 1)
pack.Recycle()
if currentBatch.Size() > 0 && currentBatch.Size() >= co.conf.BatchSize {
flushBatch()
}
case <-ticker:
if currentBatch.Size() > 0 {
flushBatch()
}
case err = <-outputExit:
ok = false
}
}
return
}
func init() {
RegisterPlugin("CassandraOutput", func() interface{} {
return new(CassandraOutput)
})
}
// Satisfies the `pipeline.ReportingPlugin` interface to provide plugin state
// information to the Heka report and dashboard.
func (co *CassandraOutput) ReportMsg(msg *message.Message) error {
co.reportLock.Lock()
defer co.reportLock.Unlock()
message.NewInt64Field(msg, "ProcessMessageCount",
atomic.LoadInt64(&co.processMessageCount), "count")
return nil
}
@nemosupremo
Copy link
Author

Example:

pack := af.h.PipelinePack(af.msgLoop)
pack.Message.SetUuid(uuid.NewRandom())
pack.Message.SetTimestamp(time.Now().UnixNano())
pack.Message.SetType("cassandra.output")
pack.Message.SetLogger(af.fr.Name())
pack.Message.SetHostname(af.hostname)
pack.Message.SetPid(int32(os.Getpid()))
pack.Message.SetSeverity(int32(6))
pack.Message.SetPayload("INSERT INTO mykeyspace.mytable (myfield, yourfield, timefield) VALUES (?,?,?)")
f, _ := message.NewField("cassandra.values", myFieldVar, "")
pack.Message.AddField(f)
f, _ = message.NewField("cassandra.values", yourFieldVar, "")
pack.Message.AddField(f)
if b, err := timeFieldBar.MarshalBinary(); err == nil {
    f, _ = message.NewField("cassandra.values", b, "time.Time")
   pack.Message.AddField(f)
}
af.fr.Inject(pack)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment