Last active
December 31, 2021 11:39
-
-
Save lucasjellema/5b2784185ce4d9bcac7cefd753fe4463 to your computer and use it in GitHub Desktop.
Dapr custom state store on OCI ObjectStorage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
OCI Object Storage state store. | |
Sample configuration in yaml: | |
apiVersion: dapr.io/v1alpha1 | |
kind: Component | |
metadata: | |
name: statestore | |
spec: | |
type: state.oci.objectstorage | |
metadata: | |
- name: tenancyOCID | |
value: <tenancyOCID> | |
- name: userOCID | |
value: <userOCID> | |
- name: fingerPrint | |
value: <fingerPrint> | |
- name: region | |
value: <region> | |
- name: bucketName | |
value: <bucket Name> | |
- name: compartmentOCID | |
value: ocid1.compartment.oc1..aaaaaaaacsssekayq4d34nl5h3eqs5e6ak3j5s4jhlws6oxf7rr5pxmt3zrq | |
- name: privateKey | |
value: | | |
-----BEGIN RSA PRIVATE KEY----- | |
XAS | |
-----END RSA PRIVATE KEY----- | |
*/ | |
package objectstorage | |
import ( | |
"bytes" | |
"context" | |
"errors" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"strings" | |
"github.com/dapr/components-contrib/state" | |
"github.com/dapr/kit/logger" | |
jsoniter "github.com/json-iterator/go" | |
"github.com/oracle/oci-go-sdk/v54/common" | |
"github.com/oracle/oci-go-sdk/v54/objectstorage" | |
) | |
const ( | |
keyDelimiter = "||" | |
tenancyKey = "tenancyOCID" | |
compartmentKey = "compartmentOCID" | |
regionKey = "region" | |
fingerPrintKey = "fingerPrint" | |
privateKeyKey = "privateKey" | |
userKey = "userOCID" | |
bucketNameKey = "bucketName" | |
) | |
type StateStore struct { | |
state.DefaultBulkStore | |
json jsoniter.API | |
features []state.Feature | |
logger logger.Logger | |
objectStorageMetadata *ObjectStorageMetadata | |
} | |
type ObjectStorageMetadata struct { | |
userOCID string | |
bucketName string | |
region string | |
tenancyOCID string | |
fingerPrint string | |
privateKey string | |
compartmentOCID string | |
namespace string | |
objectStorageClient *objectstorage.ObjectStorageClient | |
} | |
/********* Interface Implementations Init, Features, Get, Set, Delete and the instantiation function NewOCIObjectStorageStore */ | |
func (r *StateStore) Init(metadata state.Metadata) error { | |
r.logger.Debugf("Init OCI Object Storage State Store") | |
meta, err := getObjectStorageMetadata(metadata.Properties) | |
if err != nil { | |
return err | |
} | |
err = initStorageClientAndBucket(meta, r.logger) | |
if err != nil { | |
return err | |
} | |
r.logger.Debugf("OCI Object Storage State Store initialized using bucket '%s'", meta.bucketName) | |
r.objectStorageMetadata = meta | |
return nil | |
} | |
func initStorageClientAndBucket(meta *ObjectStorageMetadata, logger logger.Logger) error { | |
objectStorageClient, cerr := initializeOCIObjectStorageClient(meta) | |
if cerr != nil { | |
return cerr | |
} | |
meta.objectStorageClient = &objectStorageClient | |
ctx := context.Background() | |
meta.namespace, cerr = getNamespace(ctx, objectStorageClient) | |
if cerr != nil { | |
return cerr | |
} | |
cerr = ensureBucketExists(ctx, objectStorageClient, meta.namespace, meta.bucketName, meta.compartmentOCID, logger) | |
if cerr != nil { | |
return cerr | |
} | |
return nil | |
} | |
func initializeOCIObjectStorageClient(meta *ObjectStorageMetadata) (objectstorage.ObjectStorageClient, error) { | |
configurationProvider := initializeConfigurationProvider(meta) | |
objectStorageClient, cerr := objectstorage.NewObjectStorageClientWithConfigurationProvider(configurationProvider) | |
return objectStorageClient, cerr | |
} | |
func (r *StateStore) Features() []state.Feature { | |
return r.features | |
} | |
func (r *StateStore) Delete(req *state.DeleteRequest) error { | |
r.logger.Debugf("Delete entry from OCI Object Storage State Store with key ", req.Key) | |
err := r.deleteDocument(req) | |
return err | |
} | |
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) { | |
r.logger.Debugf("Get from OCI Object Storage State Store with key ", req.Key) | |
content, etag, err := r.readDocument((req)) | |
if err != nil { | |
r.logger.Debugf("error %s", err) | |
if err.Error() == "ObjectNotFound" { | |
return &state.GetResponse{}, nil | |
} | |
return &state.GetResponse{}, err | |
} | |
return &state.GetResponse{ | |
Data: content, | |
ETag: etag, | |
}, err | |
} | |
func (r *StateStore) Set(req *state.SetRequest) error { | |
r.logger.Debugf("saving %s to OCI Object Storage State Store", req.Key) | |
return r.writeDocument(req) | |
} | |
func (r *StateStore) Ping() error { | |
return r.pingBucket() | |
} | |
func NewOCIObjectStorageStore(logger logger.Logger) *StateStore { | |
s := &StateStore{ | |
json: jsoniter.ConfigFastest, | |
features: []state.Feature{state.FeatureETag}, | |
logger: logger, | |
} | |
s.DefaultBulkStore = state.NewDefaultBulkStore(s) | |
return s | |
} | |
/************** private helper functions */ | |
func getObjectStorageMetadata(metadata map[string]string) (*ObjectStorageMetadata, error) { | |
meta := ObjectStorageMetadata{} | |
if val, ok := metadata[bucketNameKey]; ok && val != "" { | |
meta.bucketName = val | |
} else { | |
return nil, fmt.Errorf("missing or empty %s field from metadata", bucketNameKey) | |
} | |
if val, ok := metadata[userKey]; ok && val != "" { | |
meta.userOCID = val | |
} else { | |
return nil, fmt.Errorf("missing or empty %s field from metadata", userKey) | |
} | |
if val, ok := metadata[regionKey]; ok && val != "" { | |
meta.region = val | |
} else { | |
return nil, fmt.Errorf("missing or empty %s field from metadata", regionKey) | |
} | |
if val, ok := metadata[compartmentKey]; ok && val != "" { | |
meta.compartmentOCID = val | |
} else { | |
return nil, fmt.Errorf("missing or empty %s field from metadata", compartmentKey) | |
} | |
if val, ok := metadata[fingerPrintKey]; ok && val != "" { | |
meta.fingerPrint = val | |
} else { | |
return nil, fmt.Errorf("missing or empty %s field from metadata", fingerPrintKey) | |
} | |
if val, ok := metadata[privateKeyKey]; ok && val != "" { | |
meta.privateKey = string(val) | |
} else { | |
return nil, fmt.Errorf("missing or empty %s field from metadata", privateKeyKey) | |
} | |
if val, ok := metadata[tenancyKey]; ok && val != "" { | |
meta.tenancyOCID = val | |
} else { | |
return nil, fmt.Errorf("missing or empty %s field from metadata", tenancyKey) | |
} | |
return &meta, nil | |
} | |
func (r *StateStore) writeDocument(req *state.SetRequest) error { | |
if len(req.Key) == 0 || req.Key == "" { | |
return fmt.Errorf("Key for value to set was missing from request") | |
} | |
r.logger.Debugf("Save state in OCI Object Storage Bucket under key %s ", req.Key) | |
objectName := getFileName(req.Key) | |
content := r.marshal(req) | |
objectLength := int64(len(content)) | |
ctx := context.Background() | |
etag := req.ETag | |
if req.Options.Concurrency != state.FirstWrite { | |
etag = nil | |
} | |
err := putObject(ctx, *r.objectStorageMetadata.objectStorageClient, r.objectStorageMetadata.namespace, r.objectStorageMetadata.bucketName, objectName, objectLength, ioutil.NopCloser(bytes.NewReader(content)), nil, etag, r.logger) | |
return err | |
} | |
func (r *StateStore) readDocument(req *state.GetRequest) ([]byte, *string, error) { | |
objectName := getFileName(req.Key) | |
ctx := context.Background() | |
content, etag, err := getObject(ctx, *r.objectStorageMetadata.objectStorageClient, r.objectStorageMetadata.namespace, r.objectStorageMetadata.bucketName, objectName, r.logger) | |
if err != nil { | |
r.logger.Debugf("download file %s, err %s", req.Key, err) | |
return nil, nil, err | |
} | |
return content, etag, err | |
} | |
func (r *StateStore) pingBucket() error { | |
req := objectstorage.GetBucketRequest{ | |
NamespaceName: &r.objectStorageMetadata.namespace, | |
BucketName: &r.objectStorageMetadata.bucketName, | |
} | |
// Send the request using the service client | |
_, err := r.objectStorageMetadata.objectStorageClient.GetBucket(context.Background(), req) | |
return err | |
} | |
func (r *StateStore) deleteDocument(req *state.DeleteRequest) error { | |
if len(req.Key) == 0 || req.Key == "" { | |
return fmt.Errorf("Key for value to delete was missing from request") | |
} | |
objectName := getFileName(req.Key) | |
ctx := context.Background() | |
etag := req.ETag | |
if req.Options.Concurrency != state.FirstWrite { | |
etag = nil | |
} | |
err := deleteObject(ctx, *r.objectStorageMetadata.objectStorageClient, r.objectStorageMetadata.namespace, r.objectStorageMetadata.bucketName, objectName, etag) | |
if err != nil { | |
r.logger.Debugf("error in deleting object from OCI object storage %s, err %s", req.Key, err) | |
return err | |
} | |
return err | |
} | |
func (r *StateStore) marshal(req *state.SetRequest) []byte { | |
var v string | |
b, ok := req.Value.([]byte) | |
if ok { | |
v = string(b) | |
} else { | |
v, _ = jsoniter.MarshalToString(req.Value) | |
} | |
return []byte(v) | |
} | |
func getFileName(key string) string { | |
pr := strings.Split(key, keyDelimiter) | |
if len(pr) != 2 { | |
return pr[0] | |
} | |
return pr[1] | |
} | |
/**************** functions with OCI ObjectStorage Service interaction */ | |
func getNamespace(ctx context.Context, client objectstorage.ObjectStorageClient) (string, error) { | |
request := objectstorage.GetNamespaceRequest{} | |
response, err := client.GetNamespace(ctx, request) | |
return *response.Value, err | |
} | |
func initializeConfigurationProvider(meta *ObjectStorageMetadata) common.ConfigurationProvider { | |
configurationProvider := common.NewRawConfigurationProvider(meta.tenancyOCID, meta.userOCID, meta.region, meta.fingerPrint, meta.privateKey, nil) | |
return configurationProvider | |
} | |
func deleteObject(ctx context.Context, client objectstorage.ObjectStorageClient, namespace, bucketname, objectname string, etag *string) (err error) { | |
request := objectstorage.DeleteObjectRequest{ | |
NamespaceName: &namespace, | |
BucketName: &bucketname, | |
ObjectName: &objectname, | |
IfMatch: etag, | |
} | |
_, err = client.DeleteObject(ctx, request) | |
return err | |
} | |
func getObject(ctx context.Context, c objectstorage.ObjectStorageClient, namespace string, bucketname string, objectname string, logger logger.Logger) ([]byte, *string, error) { | |
logger.Debugf("read file %s from OCI ObjectStorage StateStore %s ", objectname, bucketname) | |
request := objectstorage.GetObjectRequest{ | |
NamespaceName: &namespace, | |
BucketName: &bucketname, | |
ObjectName: &objectname, | |
} | |
response, err := c.GetObject(ctx, request) | |
if err != nil { | |
logger.Debugf("Issue in OCI ObjectStorage with retrieving object %s, error: %s", objectname, err) | |
if 404 == response.RawResponse.StatusCode { | |
return nil, nil, errors.New("ObjectNotFound") | |
} | |
return nil, nil, err | |
} | |
if response.ETag != nil { | |
logger.Debugf("OCI ObjectStorage StateStore metadata: ETag %s", *response.ETag) | |
} | |
buf := new(bytes.Buffer) | |
buf.ReadFrom(response.Content) | |
return buf.Bytes(), response.ETag, err | |
} | |
func putObject(ctx context.Context, c objectstorage.ObjectStorageClient, namespace, bucketname, objectname string, contentLen int64, content io.ReadCloser, metadata map[string]string, etag *string, logger logger.Logger) error { | |
request := objectstorage.PutObjectRequest{ | |
NamespaceName: &namespace, | |
BucketName: &bucketname, | |
ObjectName: &objectname, | |
ContentLength: &contentLen, | |
PutObjectBody: content, | |
OpcMeta: metadata, | |
IfMatch: etag, | |
} | |
_, err := c.PutObject(ctx, request) | |
logger.Debugf("Put object ", objectname, " in bucket ", bucketname) | |
return err | |
} | |
// bucketname needs to be unique within compartment. there is no concept of "child" buckets. | |
// the value returned is the bucket's OCID | |
func ensureBucketExists(ctx context.Context, client objectstorage.ObjectStorageClient, namespace string, name string, compartmentOCID string, logger logger.Logger) error { | |
req := objectstorage.GetBucketRequest{ | |
NamespaceName: &namespace, | |
BucketName: &name, | |
} | |
// verify if bucket exists | |
response, err := client.GetBucket(context.Background(), req) | |
if err != nil { | |
if 404 == response.RawResponse.StatusCode { | |
err = createBucket(ctx, client, namespace, name, compartmentOCID) | |
if err == nil { | |
logger.Debugf("Created OCI Object Storage Bucket %s as State Store", name) | |
} | |
return err | |
} | |
return err | |
} | |
return nil | |
} | |
// bucketname needs to be unique within compartment. there is no concept of "child" buckets. | |
func createBucket(ctx context.Context, client objectstorage.ObjectStorageClient, namespace string, name string, compartmentOCID string) error { | |
request := objectstorage.CreateBucketRequest{ | |
NamespaceName: &namespace, | |
} | |
request.CompartmentId = &compartmentOCID | |
request.Name = &name | |
request.Metadata = make(map[string]string) | |
request.PublicAccessType = objectstorage.CreateBucketDetailsPublicAccessTypeNopublicaccess | |
_, err := client.CreateBucket(ctx, request) | |
return err | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package objectstorage | |
/* | |
for example in ~/dapr-dev/components-contrib | |
go test -v github.com/dapr/components-contrib/state/oci/objectstorage | |
*/ | |
import ( | |
"fmt" | |
"testing" | |
"github.com/stretchr/testify/assert" | |
"github.com/dapr/components-contrib/state" | |
"github.com/dapr/kit/logger" | |
) | |
const ( | |
namespace = "idtwlqf2hanz" | |
) | |
var ociObjectStorageConfiguration = map[string]string{ | |
"bucketName": "myBuck", | |
"tenancyOCID": "ocid1.tenancy.oc1..aaaaaaaa", | |
"userOCID": "ocid1.user.oc1..aaaaaaaaby4oyyt2l4dmxliv5q", | |
"compartmentOCID": "ocid1.compartment.oc1..aaaaarr5pxmt3zrq", | |
"fingerPrint": "02:91:6c:49:d8:04:4f", | |
"privateKey": "-----BEGIN RSA PRIVATE KEY-----\nXDSDuqjo=\n-----END RSA PRIVATE KEY-----", | |
"region": "us-ashburn-1", | |
} | |
func TestInit(t *testing.T) { | |
m := state.Metadata{} | |
s := NewOCIObjectStorageStore(logger.NewLogger("logger")) | |
t.Run("Init with valid metadata", func(t *testing.T) { | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
assert.Equal(t, "myBuck", s.objectStorageMetadata.bucketName) | |
assert.Equal(t, namespace, s.objectStorageMetadata.namespace) | |
}) | |
t.Run("Init with missing metadata", func(t *testing.T) { | |
m.Properties = map[string]string{ | |
"invalidValue": "a", | |
} | |
err := s.Init(m) | |
assert.NotNil(t, err) | |
assert.Equal(t, err, fmt.Errorf("missing or empty bucketName field from metadata"), "Lacking configuration property should be spotted") | |
}) | |
} | |
func TestGet(t *testing.T) { | |
m := state.Metadata{} | |
s := NewOCIObjectStorageStore(logger.NewLogger("logger")) | |
t.Run("Get an non-existing key", func(t *testing.T) { | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
getResponse, err := s.Get(&state.GetRequest{Key: "xyzq"}) | |
assert.Equal(t, &state.GetResponse{}, getResponse, "Response must be empty") | |
assert.NoError(t, err, "Non-existing key must not be treated as error") | |
}) | |
t.Run("Get an existing key", func(t *testing.T) { | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
err = s.Set(&state.SetRequest{Key: "test-key", Value: []byte("test-value")}) | |
getResponse, err := s.Get(&state.GetRequest{Key: "test-key"}) | |
assert.Equal(t, "test-value", string(getResponse.Data), "Value retrieved should be equal to value set") | |
assert.NotNil(t, *getResponse.ETag, "ETag should be set") | |
}) | |
} | |
func TestSet(t *testing.T) { | |
m := state.Metadata{} | |
s := NewOCIObjectStorageStore(logger.NewLogger("logger")) | |
t.Run("Set without a key", func(t *testing.T) { | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
err = s.Set(&state.SetRequest{Value: []byte("test-value")}) | |
assert.Equal(t, err, fmt.Errorf("Key for value to set was missing from request"), "Lacking Key results in error") | |
// getResponse, err := s.Get(&state.GetRequest{Key: "test-key"}) | |
// assert.Equal(t, "test-value", string(getResponse.Data), "Value retrieved should be equal to value set") | |
// assert.NotNil(t, *getResponse.ETag, "ETag should be set") | |
}) | |
t.Run("Regular Set Operation", func(t *testing.T) { | |
testKey := "test-key" | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("test-value")}) | |
assert.Nil(t, err, "Setting a value with a proper key should be errorfree") | |
getResponse, err := s.Get(&state.GetRequest{Key: testKey}) | |
assert.Equal(t, "test-value", string(getResponse.Data), "Value retrieved should be equal to value set") | |
assert.NotNil(t, *getResponse.ETag, "ETag should be set") | |
}) | |
t.Run("Testing Set & Concurrency (ETags)", func(t *testing.T) { | |
testKey := "etag-test-key" | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("test-value")}) | |
assert.Nil(t, err, "Setting a value with a proper key should be errorfree") | |
getResponse, err := s.Get(&state.GetRequest{Key: testKey}) | |
etag := getResponse.ETag | |
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("overwritten-value"), ETag: etag, Options: state.SetStateOption{ | |
Concurrency: state.FirstWrite, | |
}}) | |
assert.Nil(t, err, "Updating value with proper etag should go fine") | |
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("more-overwritten-value"), ETag: etag, Options: state.SetStateOption{ | |
Concurrency: state.FirstWrite, | |
}}) | |
assert.NotNil(t, err, "Updating value with the old etag should be refused") | |
// retrieve the latest etag - assigned by the previous set operation | |
getResponse, err = s.Get(&state.GetRequest{Key: testKey}) | |
assert.NotNil(t, *getResponse.ETag, "ETag should be set") | |
etag = getResponse.ETag | |
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("more-overwritten-value"), ETag: etag, Options: state.SetStateOption{ | |
Concurrency: state.FirstWrite, | |
}}) | |
assert.Nil(t, err, "Updating value with the latest etag should be accepted") | |
}) | |
} | |
func TestDelete(t *testing.T) { | |
m := state.Metadata{} | |
s := NewOCIObjectStorageStore(logger.NewLogger("logger")) | |
t.Run("Delete without a key", func(t *testing.T) { | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
err = s.Delete(&state.DeleteRequest{}) | |
assert.Equal(t, err, fmt.Errorf("Key for value to delete was missing from request"), "Lacking Key results in error") | |
}) | |
t.Run("Regular Delete Operation", func(t *testing.T) { | |
testKey := "test-key" | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("test-value")}) | |
assert.Nil(t, err, "Setting a value with a proper key should be errorfree") | |
err = s.Delete(&state.DeleteRequest{Key: testKey}) | |
assert.Nil(t, err, "Deleting an existing value with a proper key should be errorfree") | |
}) | |
t.Run("Testing Delete & Concurrency (ETags)", func(t *testing.T) { | |
testKey := "etag-test-delete-key" | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
// create document | |
err = s.Set(&state.SetRequest{Key: testKey, Value: []byte("test-value")}) | |
assert.Nil(t, err, "Setting a value with a proper key should be errorfree") | |
getResponse, err := s.Get(&state.GetRequest{Key: testKey}) | |
etag := getResponse.ETag | |
incorrectETag := "notTheCorrectETag" | |
err = s.Delete(&state.DeleteRequest{Key: testKey, ETag: &incorrectETag, Options: state.DeleteStateOption{ | |
Concurrency: state.FirstWrite, | |
}}) | |
assert.NotNil(t, err, "Deleting value with an incorrect etag should be prevented") | |
err = s.Delete(&state.DeleteRequest{Key: testKey, ETag: etag, Options: state.DeleteStateOption{ | |
Concurrency: state.FirstWrite, | |
}}) | |
assert.Nil(t, err, "Deleting value with proper etag should go fine") | |
}) | |
} | |
func TestFeatures(t *testing.T) { | |
m := state.Metadata{} | |
s := NewOCIObjectStorageStore(logger.NewLogger("logger")) | |
t.Run("Test contents of Features", func(t *testing.T) { | |
m.Properties = ociObjectStorageConfiguration | |
features := s.Features() | |
assert.Contains(t, features, state.FeatureETag) | |
}) | |
} | |
func TestPing(t *testing.T) { | |
m := state.Metadata{} | |
s := NewOCIObjectStorageStore(logger.NewLogger("logger")) | |
t.Run("Ping", func(t *testing.T) { | |
m.Properties = ociObjectStorageConfiguration | |
err := s.Init(m) | |
assert.Nil(t, err) | |
err = s.Ping() | |
assert.Nil(t, err, "Ping should be successful") | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment