Skip to content

Instantly share code, notes, and snippets.

@lucasjellema
Last active December 31, 2021 11:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lucasjellema/5b2784185ce4d9bcac7cefd753fe4463 to your computer and use it in GitHub Desktop.
Save lucasjellema/5b2784185ce4d9bcac7cefd753fe4463 to your computer and use it in GitHub Desktop.
Dapr custom state store on OCI ObjectStorage
/*
OCI Object Storage state store.
Sample configuration in yaml:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.oci.objectstorage
metadata:
- name: tenancyOCID
value: <tenancyOCID>
- name: userOCID
value: <userOCID>
- name: fingerPrint
value: <fingerPrint>
- name: region
value: <region>
- name: bucketName
value: <bucket Name>
- name: compartmentOCID
value: ocid1.compartment.oc1..aaaaaaaacsssekayq4d34nl5h3eqs5e6ak3j5s4jhlws6oxf7rr5pxmt3zrq
- name: privateKey
value: |
-----BEGIN RSA PRIVATE KEY-----
XAS
-----END RSA PRIVATE KEY-----
*/
package objectstorage
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"strings"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
jsoniter "github.com/json-iterator/go"
"github.com/oracle/oci-go-sdk/v54/common"
"github.com/oracle/oci-go-sdk/v54/objectstorage"
)
const (
keyDelimiter = "||"
tenancyKey = "tenancyOCID"
compartmentKey = "compartmentOCID"
regionKey = "region"
fingerPrintKey = "fingerPrint"
privateKeyKey = "privateKey"
userKey = "userOCID"
bucketNameKey = "bucketName"
)
type StateStore struct {
state.DefaultBulkStore
json jsoniter.API
features []state.Feature
logger logger.Logger
objectStorageMetadata *ObjectStorageMetadata
}
type ObjectStorageMetadata struct {
userOCID string
bucketName string
region string
tenancyOCID string
fingerPrint string
privateKey string
compartmentOCID string
namespace string
objectStorageClient *objectstorage.ObjectStorageClient
}
/********* Interface Implementations Init, Features, Get, Set, Delete and the instantiation function NewOCIObjectStorageStore */
func (r *StateStore) Init(metadata state.Metadata) error {
r.logger.Debugf("Init OCI Object Storage State Store")
meta, err := getObjectStorageMetadata(metadata.Properties)
if err != nil {
return err
}
err = initStorageClientAndBucket(meta, r.logger)
if err != nil {
return err
}
r.logger.Debugf("OCI Object Storage State Store initialized using bucket '%s'", meta.bucketName)
r.objectStorageMetadata = meta
return nil
}
func initStorageClientAndBucket(meta *ObjectStorageMetadata, logger logger.Logger) error {
objectStorageClient, cerr := initializeOCIObjectStorageClient(meta)
if cerr != nil {
return cerr
}
meta.objectStorageClient = &objectStorageClient
ctx := context.Background()
meta.namespace, cerr = getNamespace(ctx, objectStorageClient)
if cerr != nil {
return cerr
}
cerr = ensureBucketExists(ctx, objectStorageClient, meta.namespace, meta.bucketName, meta.compartmentOCID, logger)
if cerr != nil {
return cerr
}
return nil
}
func initializeOCIObjectStorageClient(meta *ObjectStorageMetadata) (objectstorage.ObjectStorageClient, error) {
configurationProvider := initializeConfigurationProvider(meta)
objectStorageClient, cerr := objectstorage.NewObjectStorageClientWithConfigurationProvider(configurationProvider)
return objectStorageClient, cerr
}
func (r *StateStore) Features() []state.Feature {
return r.features
}
func (r *StateStore) Delete(req *state.DeleteRequest) error {
r.logger.Debugf("Delete entry from OCI Object Storage State Store with key ", req.Key)
err := r.deleteDocument(req)
return err
}
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
r.logger.Debugf("Get from OCI Object Storage State Store with key ", req.Key)
content, etag, err := r.readDocument((req))
if err != nil {
r.logger.Debugf("error %s", err)
if err.Error() == "ObjectNotFound" {
return &state.GetResponse{}, nil
}
return &state.GetResponse{}, err
}
return &state.GetResponse{
Data: content,
ETag: etag,
}, err
}
func (r *StateStore) Set(req *state.SetRequest) error {
r.logger.Debugf("saving %s to OCI Object Storage State Store", req.Key)
return r.writeDocument(req)
}
func (r *StateStore) Ping() error {
return r.pingBucket()
}
func NewOCIObjectStorageStore(logger logger.Logger) *StateStore {
s := &StateStore{
json: jsoniter.ConfigFastest,
features: []state.Feature{state.FeatureETag},
logger: logger,
}
s.DefaultBulkStore = state.NewDefaultBulkStore(s)
return s
}
/************** private helper functions */
func getObjectStorageMetadata(metadata map[string]string) (*ObjectStorageMetadata, error) {
meta := ObjectStorageMetadata{}
if val, ok := metadata[bucketNameKey]; ok && val != "" {
meta.bucketName = val
} else {
return nil, fmt.Errorf("missing or empty %s field from metadata", bucketNameKey)
}
if val, ok := metadata[userKey]; ok && val != "" {
meta.userOCID = val
} else {
return nil, fmt.Errorf("missing or empty %s field from metadata", userKey)
}
if val, ok := metadata[regionKey]; ok && val != "" {
meta.region = val
} else {
return nil, fmt.Errorf("missing or empty %s field from metadata", regionKey)
}
if val, ok := metadata[compartmentKey]; ok && val != "" {
meta.compartmentOCID = val
} else {
return nil, fmt.Errorf("missing or empty %s field from metadata", compartmentKey)
}
if val, ok := metadata[fingerPrintKey]; ok && val != "" {
meta.fingerPrint = val
} else {
return nil, fmt.Errorf("missing or empty %s field from metadata", fingerPrintKey)
}
if val, ok := metadata[privateKeyKey]; ok && val != "" {
meta.privateKey = string(val)
} else {
return nil, fmt.Errorf("missing or empty %s field from metadata", privateKeyKey)
}
if val, ok := metadata[tenancyKey]; ok && val != "" {
meta.tenancyOCID = val
} else {
return nil, fmt.Errorf("missing or empty %s field from metadata", tenancyKey)
}
return &meta, nil
}
func (r *StateStore) writeDocument(req *state.SetRequest) error {
if len(req.Key) == 0 || req.Key == "" {
return fmt.Errorf("Key for value to set was missing from request")
}
r.logger.Debugf("Save state in OCI Object Storage Bucket under key %s ", req.Key)
objectName := getFileName(req.Key)
content := r.marshal(req)
objectLength := int64(len(content))
ctx := context.Background()
etag := req.ETag
if req.Options.Concurrency != state.FirstWrite {
etag = nil
}
err := putObject(ctx, *r.objectStorageMetadata.objectStorageClient, r.objectStorageMetadata.namespace, r.objectStorageMetadata.bucketName, objectName, objectLength, ioutil.NopCloser(bytes.NewReader(content)), nil, etag, r.logger)
return err
}
func (r *StateStore) readDocument(req *state.GetRequest) ([]byte, *string, error) {
objectName := getFileName(req.Key)
ctx := context.Background()
content, etag, err := getObject(ctx, *r.objectStorageMetadata.objectStorageClient, r.objectStorageMetadata.namespace, r.objectStorageMetadata.bucketName, objectName, r.logger)
if err != nil {
r.logger.Debugf("download file %s, err %s", req.Key, err)
return nil, nil, err
}
return content, etag, err
}
func (r *StateStore) pingBucket() error {
req := objectstorage.GetBucketRequest{
NamespaceName: &r.objectStorageMetadata.namespace,
BucketName: &r.objectStorageMetadata.bucketName,
}
// Send the request using the service client
_, err := r.objectStorageMetadata.objectStorageClient.GetBucket(context.Background(), req)
return err
}
func (r *StateStore) deleteDocument(req *state.DeleteRequest) error {
if len(req.Key) == 0 || req.Key == "" {
return fmt.Errorf("Key for value to delete was missing from request")
}
objectName := getFileName(req.Key)
ctx := context.Background()
etag := req.ETag
if req.Options.Concurrency != state.FirstWrite {
etag = nil
}
err := deleteObject(ctx, *r.objectStorageMetadata.objectStorageClient, r.objectStorageMetadata.namespace, r.objectStorageMetadata.bucketName, objectName, etag)
if err != nil {
r.logger.Debugf("error in deleting object from OCI object storage %s, err %s", req.Key, err)
return err
}
return err
}
func (r *StateStore) marshal(req *state.SetRequest) []byte {
var v string
b, ok := req.Value.([]byte)
if ok {
v = string(b)
} else {
v, _ = jsoniter.MarshalToString(req.Value)
}
return []byte(v)
}
func getFileName(key string) string {
pr := strings.Split(key, keyDelimiter)
if len(pr) != 2 {
return pr[0]
}
return pr[1]
}
/**************** functions with OCI ObjectStorage Service interaction */
func getNamespace(ctx context.Context, client objectstorage.ObjectStorageClient) (string, error) {
request := objectstorage.GetNamespaceRequest{}
response, err := client.GetNamespace(ctx, request)
return *response.Value, err
}
func initializeConfigurationProvider(meta *ObjectStorageMetadata) common.ConfigurationProvider {
configurationProvider := common.NewRawConfigurationProvider(meta.tenancyOCID, meta.userOCID, meta.region, meta.fingerPrint, meta.privateKey, nil)
return configurationProvider
}
func deleteObject(ctx context.Context, client objectstorage.ObjectStorageClient, namespace, bucketname, objectname string, etag *string) (err error) {
request := objectstorage.DeleteObjectRequest{
NamespaceName: &namespace,
BucketName: &bucketname,
ObjectName: &objectname,
IfMatch: etag,
}
_, err = client.DeleteObject(ctx, request)
return err
}
func getObject(ctx context.Context, c objectstorage.ObjectStorageClient, namespace string, bucketname string, objectname string, logger logger.Logger) ([]byte, *string, error) {
logger.Debugf("read file %s from OCI ObjectStorage StateStore %s ", objectname, bucketname)
request := objectstorage.GetObjectRequest{
NamespaceName: &namespace,
BucketName: &bucketname,
ObjectName: &objectname,
}
response, err := c.GetObject(ctx, request)
if err != nil {
logger.Debugf("Issue in OCI ObjectStorage with retrieving object %s, error: %s", objectname, err)
if 404 == response.RawResponse.StatusCode {
return nil, nil, errors.New("ObjectNotFound")
}
return nil, nil, err
}
if response.ETag != nil {
logger.Debugf("OCI ObjectStorage StateStore metadata: ETag %s", *response.ETag)
}
buf := new(bytes.Buffer)
buf.ReadFrom(response.Content)
return buf.Bytes(), response.ETag, err
}
func putObject(ctx context.Context, c objectstorage.ObjectStorageClient, namespace, bucketname, objectname string, contentLen int64, content io.ReadCloser, metadata map[string]string, etag *string, logger logger.Logger) error {
request := objectstorage.PutObjectRequest{
NamespaceName: &namespace,
BucketName: &bucketname,
ObjectName: &objectname,
ContentLength: &contentLen,
PutObjectBody: content,
OpcMeta: metadata,
IfMatch: etag,
}
_, err := c.PutObject(ctx, request)
logger.Debugf("Put object ", objectname, " in bucket ", bucketname)
return err
}
// bucketname needs to be unique within compartment. there is no concept of "child" buckets.
// the value returned is the bucket's OCID
func ensureBucketExists(ctx context.Context, client objectstorage.ObjectStorageClient, namespace string, name string, compartmentOCID string, logger logger.Logger) error {
req := objectstorage.GetBucketRequest{
NamespaceName: &namespace,
BucketName: &name,
}
// verify if bucket exists
response, err := client.GetBucket(context.Background(), req)
if err != nil {
if 404 == response.RawResponse.StatusCode {
err = createBucket(ctx, client, namespace, name, compartmentOCID)
if err == nil {
logger.Debugf("Created OCI Object Storage Bucket %s as State Store", name)
}
return err
}
return err
}
return nil
}
// bucketname needs to be unique within compartment. there is no concept of "child" buckets.
func createBucket(ctx context.Context, client objectstorage.ObjectStorageClient, namespace string, name string, compartmentOCID string) error {
request := objectstorage.CreateBucketRequest{
NamespaceName: &namespace,
}
request.CompartmentId = &compartmentOCID
request.Name = &name
request.Metadata = make(map[string]string)
request.PublicAccessType = objectstorage.CreateBucketDetailsPublicAccessTypeNopublicaccess
_, err := client.CreateBucket(ctx, request)
return err
}
package objectstorage
/*
for example in ~/dapr-dev/components-contrib
go test -v github.com/dapr/components-contrib/state/oci/objectstorage
*/
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
const (
namespace = "idtwlqf2hanz"
)
var ociObjectStorageConfiguration = map[string]string{
"bucketName": "myBuck",
"tenancyOCID": "ocid1.tenancy.oc1..aaaaaaaa",
"userOCID": "ocid1.user.oc1..aaaaaaaaby4oyyt2l4dmxliv5q",
"compartmentOCID": "ocid1.compartment.oc1..aaaaarr5pxmt3zrq",
"fingerPrint": "02:91:6c:49:d8:04:4f",
"privateKey": "-----BEGIN RSA PRIVATE KEY-----\nXDSDuqjo=\n-----END RSA PRIVATE KEY-----",
"region": "us-ashburn-1",
}
func TestInit(t *testing.T) {
m := state.Metadata{}
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
t.Run("Init with valid metadata", func(t *testing.T) {
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
assert.Equal(t, "myBuck", s.objectStorageMetadata.bucketName)
assert.Equal(t, namespace, s.objectStorageMetadata.namespace)
})
t.Run("Init with missing metadata", func(t *testing.T) {
m.Properties = map[string]string{
"invalidValue": "a",
}
err := s.Init(m)
assert.NotNil(t, err)
assert.Equal(t, err, fmt.Errorf("missing or empty bucketName field from metadata"), "Lacking configuration property should be spotted")
})
}
func TestGet(t *testing.T) {
m := state.Metadata{}
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
t.Run("Get an non-existing key", func(t *testing.T) {
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
getResponse, err := s.Get(&state.GetRequest{Key: "xyzq"})
assert.Equal(t, &state.GetResponse{}, getResponse, "Response must be empty")
assert.NoError(t, err, "Non-existing key must not be treated as error")
})
t.Run("Get an existing key", func(t *testing.T) {
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
err = s.Set(&state.SetRequest{Key: "test-key", Value: []byte("test-value")})
getResponse, err := s.Get(&state.GetRequest{Key: "test-key"})
assert.Equal(t, "test-value", string(getResponse.Data), "Value retrieved should be equal to value set")
assert.NotNil(t, *getResponse.ETag, "ETag should be set")
})
}
func TestSet(t *testing.T) {
m := state.Metadata{}
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
t.Run("Set without a key", func(t *testing.T) {
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
err = s.Set(&state.SetRequest{Value: []byte("test-value")})
assert.Equal(t, err, fmt.Errorf("Key for value to set was missing from request"), "Lacking Key results in error")
// getResponse, err := s.Get(&state.GetRequest{Key: "test-key"})
// assert.Equal(t, "test-value", string(getResponse.Data), "Value retrieved should be equal to value set")
// assert.NotNil(t, *getResponse.ETag, "ETag should be set")
})
t.Run("Regular Set Operation", func(t *testing.T) {
testKey := "test-key"
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("test-value")})
assert.Nil(t, err, "Setting a value with a proper key should be errorfree")
getResponse, err := s.Get(&state.GetRequest{Key: testKey})
assert.Equal(t, "test-value", string(getResponse.Data), "Value retrieved should be equal to value set")
assert.NotNil(t, *getResponse.ETag, "ETag should be set")
})
t.Run("Testing Set & Concurrency (ETags)", func(t *testing.T) {
testKey := "etag-test-key"
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("test-value")})
assert.Nil(t, err, "Setting a value with a proper key should be errorfree")
getResponse, err := s.Get(&state.GetRequest{Key: testKey})
etag := getResponse.ETag
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("overwritten-value"), ETag: etag, Options: state.SetStateOption{
Concurrency: state.FirstWrite,
}})
assert.Nil(t, err, "Updating value with proper etag should go fine")
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("more-overwritten-value"), ETag: etag, Options: state.SetStateOption{
Concurrency: state.FirstWrite,
}})
assert.NotNil(t, err, "Updating value with the old etag should be refused")
// retrieve the latest etag - assigned by the previous set operation
getResponse, err = s.Get(&state.GetRequest{Key: testKey})
assert.NotNil(t, *getResponse.ETag, "ETag should be set")
etag = getResponse.ETag
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("more-overwritten-value"), ETag: etag, Options: state.SetStateOption{
Concurrency: state.FirstWrite,
}})
assert.Nil(t, err, "Updating value with the latest etag should be accepted")
})
}
func TestDelete(t *testing.T) {
m := state.Metadata{}
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
t.Run("Delete without a key", func(t *testing.T) {
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
err = s.Delete(&state.DeleteRequest{})
assert.Equal(t, err, fmt.Errorf("Key for value to delete was missing from request"), "Lacking Key results in error")
})
t.Run("Regular Delete Operation", func(t *testing.T) {
testKey := "test-key"
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("test-value")})
assert.Nil(t, err, "Setting a value with a proper key should be errorfree")
err = s.Delete(&state.DeleteRequest{Key: testKey})
assert.Nil(t, err, "Deleting an existing value with a proper key should be errorfree")
})
t.Run("Testing Delete & Concurrency (ETags)", func(t *testing.T) {
testKey := "etag-test-delete-key"
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
// create document
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("test-value")})
assert.Nil(t, err, "Setting a value with a proper key should be errorfree")
getResponse, err := s.Get(&state.GetRequest{Key: testKey})
etag := getResponse.ETag
incorrectETag := "notTheCorrectETag"
err = s.Delete(&state.DeleteRequest{Key: testKey, ETag: &incorrectETag, Options: state.DeleteStateOption{
Concurrency: state.FirstWrite,
}})
assert.NotNil(t, err, "Deleting value with an incorrect etag should be prevented")
err = s.Delete(&state.DeleteRequest{Key: testKey, ETag: etag, Options: state.DeleteStateOption{
Concurrency: state.FirstWrite,
}})
assert.Nil(t, err, "Deleting value with proper etag should go fine")
})
}
func TestFeatures(t *testing.T) {
m := state.Metadata{}
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
t.Run("Test contents of Features", func(t *testing.T) {
m.Properties = ociObjectStorageConfiguration
features := s.Features()
assert.Contains(t, features, state.FeatureETag)
})
}
func TestPing(t *testing.T) {
m := state.Metadata{}
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
t.Run("Ping", func(t *testing.T) {
m.Properties = ociObjectStorageConfiguration
err := s.Init(m)
assert.Nil(t, err)
err = s.Ping()
assert.Nil(t, err, "Ping should be successful")
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment