Skip to content

Instantly share code, notes, and snippets.

@akshaydeo
Created May 2, 2017 14:50
Show Gist options
  • Save akshaydeo/65dacbb8276e0645715469ada4a82078 to your computer and use it in GitHub Desktop.
Save akshaydeo/65dacbb8276e0645715469ada4a82078 to your computer and use it in GitHub Desktop.
CQLExtractor files
package cqextractor
import (
"fmt"
"time"
"github.com/media-net/cargo/cassandra"
"github.com/media-net/cargo/utils"
"github.com/gocql/gocql"
"github.com/media-net/cargo/logger"
"strings"
"sync"
)
const (
METADATA_TABLE_QUERY_FORMAT = "CREATE TABLE IF NOT EXISTS cqmd_%s (state_id text, page_state text, time_added timestamp, PRIMARY KEY(state_id, time_added)) WITH CLUSTERING ORDER BY (time_added DESC)"
GET_LATEST_TIME_ADDED = "SELECT max(time_added) FROM cqmd_%s"
GET_LATEST_STATE_QUERY_FORMAT = "SELECT page_state FROM cqmd_%s WHERE time_added = ? ALLOW FILTERING"
SAVE_LATEST_STATE_QUERY_FORMAT = "INSERT INTO cqmd_%s (state_id, page_state, time_added) VALUES (?,?,?)"
FLUSH_STATE_QUERY_FORMAT = "DELETE FROM cqmd_%s WHERE state_id = '1' and time_added < ?"
)
type CQLExtractorConfig struct {
Id string
Keyspace string
Addresses []string
TableName string
RecordLimit int
ThrottleTimeInMs int
/*
These dates are for batch extraction from cassandra,
not for the real time
TODO shall we write different extractor?
"2017-04-18 10:00:23"
*/
StartDate string
EndDate string
}
// cassandra executor
type CQLExtractor struct {
config *CQLExtractorConfig
cassandra *cassandra.Cassandra
DataChan chan interface{} `json:"-"`
ErrorChan chan error `json:"-"`
Key string `json:"key"`
TableName string `json:"event_type"`
LastPageState string `json:"last_page_state"`
RecordLimit int `json:"limit"`
ThrottleTimeInMs int `json:"sleep_time_ms"`
shouldStop bool
sync.Mutex
/*
These dates are for batch extraction from cassandra,
not for the real time
StartDate is inclusive, EndDate is exclusive
*/
StartDate string `json:"start_date"`
EndDate string `json:"end_date"`
}
func NewCQLExtractor(config CQLExtractorConfig) (*CQLExtractor, error) {
var err error
ext := new(CQLExtractor)
ext.config = &config
ext.DataChan = make(chan interface{})
ext.ErrorChan = make(chan error)
ext.Key = fmt.Sprintf("%s_%s", config.TableName, config.Id)
ext.cassandra, err = cassandra.Connect(config.Keyspace, config.Addresses)
if err != nil {
return nil, err
}
ext.TableName = config.TableName
err = ext.cassandra.Exec(fmt.Sprintf(METADATA_TABLE_QUERY_FORMAT, ext.Key))
if err != nil {
return nil, err
}
ext.LastPageState, err = ext.getSavedPageState()
if err != nil {
return nil, err
}
ext.RecordLimit = config.RecordLimit
ext.ThrottleTimeInMs = config.ThrottleTimeInMs
ext.StartDate = config.StartDate
ext.EndDate = config.EndDate
return ext, err
}
func (c *CQLExtractor) getSavedPageState() (string, error) {
result := map[string]interface{}{}
err := c.cassandra.GetSession().Query(fmt.Sprintf(GET_LATEST_TIME_ADDED, c.Key)).MapScan(result)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return "", nil
}
logger.E("Error while getting saved page state", err)
return "", err
}
logger.D("##testingthisshit", result["system.max(time_added)"])
err = c.cassandra.GetSession().Query(fmt.Sprintf(GET_LATEST_STATE_QUERY_FORMAT, c.Key), result["system.max(time_added)"].(time.Time)).MapScan(result)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return "", nil
}
logger.E("Error while getting saved page state", err)
return "", err
}
return string(utils.DecodeBase64(result["page_state"].(string))), nil
}
func (c *CQLExtractor) savePageState(state string) error {
now := time.Now()
logger.D("Saving latest state", state)
err := c.cassandra.Exec(fmt.Sprintf(SAVE_LATEST_STATE_QUERY_FORMAT, c.Key), "1", state, now)
if err != nil {
logger.E("Error while saving latest state")
return err
}
// now flushing the older states
err = c.cassandra.Exec(fmt.Sprintf(FLUSH_STATE_QUERY_FORMAT, c.Key), now)
if err != nil {
logger.E("Error while flushing the older states", err)
}
return nil
}
func (c *CQLExtractor) Extract() (<-chan interface{}, <-chan error) {
go func() {
ticker := time.NewTimer(time.Duration(c.ThrottleTimeInMs) * time.Millisecond)
for {
select {
case <-ticker.C:
c.extractData()
ticker.Reset(time.Duration(c.ThrottleTimeInMs) * time.Millisecond)
}
c.Lock()
if c.shouldStop {
c.Unlock()
return
}
c.Unlock()
}
}()
return c.DataChan, c.ErrorChan
}
func (c *CQLExtractor) getQuery() string {
if c.StartDate == "" || c.EndDate == "" {
return fmt.Sprintf("SELECT * from %s ALLOW FILTERING", c.TableName)
} else {
return fmt.Sprintf("SELECT * from %s WHERE time_added >= '%s' and time_added < '%s' ALLOW FILTERING",
c.TableName, c.StartDate, c.EndDate)
}
}
func (c *CQLExtractor) extractData() {
var iter *gocql.Iter
iter = c.cassandra.GetSession().Query(c.getQuery()).PageState([]byte(c.LastPageState)).PageSize(c.RecordLimit).Iter()
logger.D(iter.PageState())
dataMap, err := iter.SliceMap()
if err != nil {
c.ErrorChan <- err
return
}
dataMapLen := len(dataMap)
if dataMapLen < 1 {
//logger.D("Got", dataMapLen, "records")
return
}
if dataMapLen < c.RecordLimit {
logger.W("Got less records than the page limit, refiring query for lower limit", dataMapLen)
iter = c.cassandra.GetSession().Query(c.getQuery()).PageState([]byte(c.LastPageState)).PageSize(dataMapLen).Iter()
dataMap, err = iter.SliceMap()
if err != nil {
c.ErrorChan <- err
return
}
}
logger.D(iter.PageState())
c.LastPageState = string(iter.PageState())
for i := 0; i < dataMapLen; i++ {
val := dataMap[i]
msg, err := utils.ToJson(&val)
if err != nil {
c.ErrorChan <- err
}
c.DataChan <- *msg
}
err = c.savePageState(utils.Base64([]byte(c.LastPageState)))
if err != nil {
c.ErrorChan <- err
}
err = iter.Close()
if err != nil {
c.ErrorChan <- err
}
}
func (c *CQLExtractor) Stop() {
c.Lock()
defer c.Unlock()
c.shouldStop = true
}
package extractor
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/media-net/cargo/etl/extractor/cqextractor"
"github.com/media-net/cargo/cassandra"
"github.com/media-net/cargo/logger"
"github.com/media-net/cargo/utils"
"math/rand"
"time"
"fmt"
)
const (
ENTRY_COUNT = 100
TABLE_NAME = "test1"
)
var _ = FDescribe("Extractors", func() {
var e *CQLExtractor
var c *cassandra.Cassandra
BeforeEach(func() {
var err error
c, err = cassandra.Connect("test", []string{"127.0.0.1:9042"})
Expect(err).NotTo(HaveOccurred())
// dropping the table
c.Exec("DROP TABLE test1")
c.Exec("DROP TABLE cqmd_test1_test")
c.Exec("DROP TABLE cqmd_test_test")
// pushing records into cassandra
err = c.Exec("CREATE TABLE test1 (data int, time_added timestamp, PRIMARY KEY(data, time_added)) WITH CLUSTERING ORDER BY (time_added ASC)")
Expect(err).NotTo(HaveOccurred())
// pushing raw data
for i := 0; i < ENTRY_COUNT; i++ {
err = c.Exec("INSERT INTO test1 (data, time_added) VALUES (?,?)", i, time.Now())
Expect(err).NotTo(HaveOccurred())
}
e, err = NewCQLExtractor(CQLExtractorConfig{
Id: "test",
TableName: "test1",
RecordLimit: 1,
ThrottleTimeInMs: 100,
Keyspace: "test",
Addresses: []string{"127.0.0.1:9042"},
})
Expect(err).NotTo(HaveOccurred())
})
Describe("CQLExtractor", func() {
It("should extract the records as per the defined config", func() {
varmap := make([]int8, ENTRY_COUNT)
count := 0
tmp := struct {
Data int `json:"data"`
}{}
data, err := e.Extract()
for count < ENTRY_COUNT {
select {
case d := <-data:
err := utils.FromJson(d.(string), &tmp)
Expect(err).NotTo(HaveOccurred())
if varmap[tmp.Data] == 1 {
Fail("Got the same record twice")
}
varmap[tmp.Data] = 1
count += 1
case e := <-err:
Fail(e.Error())
}
}
})
It("should save the state of the last read messages and should be used across different sessions", func() {
var err error
varmap := make([]int8, ENTRY_COUNT)
count := 0
tmp := struct {
Data int `json:"data"`
}{}
data, errChan := e.Extract()
firstIterLength := rand.Intn(ENTRY_COUNT)
for count < firstIterLength {
select {
case d := <-data:
err := utils.FromJson(d.(string), &tmp)
Expect(err).NotTo(HaveOccurred())
if varmap[tmp.Data] == 1 {
Fail("Got the same record twice")
}
varmap[tmp.Data] = 1
logger.D(varmap[tmp.Data])
count += 1
case e := <-errChan:
Fail(e.Error())
}
}
// now renewing the session
e, err = NewCQLExtractor(CQLExtractorConfig{
Id: "test",
TableName: "test1",
RecordLimit: 1,
ThrottleTimeInMs: 100,
Keyspace: "test",
Addresses: []string{"127.0.0.1:9042"},
})
Expect(err).NotTo(HaveOccurred())
data, errChan = e.Extract()
for count < ENTRY_COUNT {
select {
case d := <-data:
err := utils.FromJson(d.(string), &tmp)
Expect(err).NotTo(HaveOccurred())
if varmap[tmp.Data] == 1 {
Fail("Got the same record twice")
}
varmap[tmp.Data] = 1
count += 1
case e := <-errChan:
Fail(e.Error())
}
}
})
It("should not affect the records after restarting", func() {
varmap := make([]int8, ENTRY_COUNT)
count := 0
tmp := struct {
Data int `json:"data"`
}{}
data, errChan := e.Extract()
firstIterLength := rand.Intn(ENTRY_COUNT)
for count < firstIterLength {
select {
case d := <-data:
err := utils.FromJson(d.(string), &tmp)
Expect(err).NotTo(HaveOccurred())
if varmap[tmp.Data] == 1 {
Fail("Got the same record twice")
}
varmap[tmp.Data] = 1
logger.D(varmap[tmp.Data])
count += 1
case e := <-errChan:
Fail(e.Error())
}
}
for count < ENTRY_COUNT {
select {
case d := <-data:
err := utils.FromJson(d.(string), &tmp)
Expect(err).NotTo(HaveOccurred())
if varmap[tmp.Data] == 1 {
Fail("Got the same record twice")
}
varmap[tmp.Data] = 1
count += 1
case e := <-errChan:
Fail(e.Error())
}
}
})
Context("with uneven record sets fetching", func() {
BeforeEach(func() {
var err error
e, err = NewCQLExtractor(CQLExtractorConfig{
Id: "test",
TableName: "test1",
RecordLimit: 10,
ThrottleTimeInMs: 100,
Keyspace: "test",
Addresses: []string{"127.0.0.1:9042"},
})
Expect(err).NotTo(HaveOccurred())
})
// failing test case
FIt("should save last page state properly", func() {
varmap := make([]int8, 2*ENTRY_COUNT)
count := 0
tmp := struct {
Data int `json:"data"`
}{}
data, errChan := e.Extract()
By("Fetching all the pushed records")
for count < ENTRY_COUNT {
select {
case d := <-data:
err := utils.FromJson(d.(string), &tmp)
Expect(err).NotTo(HaveOccurred())
fmt.Println("Got", tmp.Data)
if varmap[tmp.Data] == 1 {
Fail("Got the same record twice")
}
varmap[tmp.Data] = 1
count += 1
case e := <-errChan:
Fail(e.Error())
}
}
By("Comparing the fetched records to the count")
Expect(count).To(Equal(100))
By("Pushing some extra records inside cassandra")
for i := 0; i < ENTRY_COUNT; i++ {
err := c.Exec("INSERT INTO test1 (data, time_added) VALUES (?,?)", ENTRY_COUNT+i, time.Now())
Expect(err).NotTo(HaveOccurred())
}
By("Fetching them out")
for count < 2*ENTRY_COUNT {
select {
case d := <-data:
err := utils.FromJson(d.(string), &tmp)
Expect(err).NotTo(HaveOccurred())
fmt.Println("Got", tmp.Data)
if varmap[tmp.Data] == 1 {
Fail("Got the same record twice")
}
varmap[tmp.Data] = 1
count += 1
case e := <-errChan:
Fail(e.Error())
}
}
Expect(count).To(Equal(200))
})
})
})
AfterEach(func() {
// removing all records from cassandra
c.Exec("DROP TABLE test1")
c.Exec("DROP TABLE cqmd_test1_test")
c.Exec("DROP TABLE cqmd_test_test")
c.Close()
})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment