Skip to content

Instantly share code, notes, and snippets.

@tleyden
Created December 28, 2016 19:46
Show Gist options
  • Save tleyden/0919c01e154d039d77cffe8c2b957393 to your computer and use it in GitHub Desktop.
Save tleyden/0919c01e154d039d77cffe8c2b957393 to your computer and use it in GitHub Desktop.
// Create a new Moss EventRecord impl
func NewMossEventRecord(persistToDisk bool, storageDir string) (*MossEventRecord, error) {
mossEventRecord := &MossEventRecord{}
if persistToDisk {
// Open moss in persistent mode
store, collection, err := moss.OpenStoreCollection(
storageDir,
moss.StoreOptions{},
moss.StorePersistOptions{},
)
if err != nil {
return nil, fmt.Errorf("Error setting up persistent event record: %v", err)
}
mossEventRecord.store = store
mossEventRecord.collection = collection
// Apparently .Start() shouldn't be called on persisted collections
// https://github.com/couchbase/moss/issues/5
// mossEventRecord.collection.Start()
} else {
// Open moss in-memory store only
collection, err := moss.NewCollection(moss.CollectionOptions{})
if err != nil {
return nil, fmt.Errorf("Error setting up event record: %v", err)
}
mossEventRecord.collection = collection
// Call Start() or else it will panic when trying to close it
// https://github.com/couchbase/moss/issues/4
mossEventRecord.collection.Start()
}
return mossEventRecord, nil
}
// Store an SQS message in Moss
func (mer *MossEventRecord) StoreSQSMessage(sqsMessage *sqs.Message) error {
if sqsMessage.MessageId == nil {
return fmt.Errorf("Cannot store SQS message since MessageId is nil")
}
// serialize to JSON and store in Moss KV store
sqsMessageBytes, err := json.Marshal(sqsMessage)
if err != nil {
return err
}
batch, err := mer.collection.NewBatch(0, 0)
if err != nil {
return err
}
defer batch.Close()
batch.Set([]byte(*sqsMessage.MessageId), sqsMessageBytes)
return mer.collection.ExecuteBatch(
batch,
moss.WriteOptions{},
)
}
func (mer *MossEventRecord) GetStoredSQSMessages() (sqsMessages []sqs.Message, err error) {
result := []sqs.Message{}
snapshot, err := mer.collection.Snapshot()
if err != nil {
return result, err
}
if snapshot == nil {
return result, fmt.Errorf("Unable to take moss collection snapshot")
}
defer snapshot.Close()
iter, err := snapshot.StartIterator(nil, nil, moss.IteratorOptions{})
if err != nil {
return result, err
}
if iter == nil {
return result, fmt.Errorf("Unable to get moss collection iterator")
}
defer iter.Close()
for {
_, v, err := iter.Current()
if err == moss.ErrIteratorDone {
return result, nil
}
sqsMessage := sqs.Message{}
err = json.Unmarshal(v, &sqsMessage)
if err != nil {
return result, err
}
result = append(result, sqsMessage)
err = iter.Next()
if err != moss.ErrIteratorDone {
return result, nil
}
}
return result, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment