Skip to content

Instantly share code, notes, and snippets.

@SpareShade
Last active December 12, 2023 01:41
Show Gist options
  • Save SpareShade/d38ecc0202a87ec7439a00cf0d0f2577 to your computer and use it in GitHub Desktop.
Save SpareShade/d38ecc0202a87ec7439a00cf0d0f2577 to your computer and use it in GitHub Desktop.
GetUp - Event Sourcing
// Package caption provides the implementation of the CaptionAggregate, an aggregate representing the state
// of captions for artworks within the artwork domain. This package handles the registration, modification,
// and tracking of caption details for both unique and editioned artworks. The CaptionAggregate ensures data
// integrity by applying events in a specific order and emitting new events for changes in caption details.
// It also includes functions for transitioning the aggregate state, tracking changes, and handling various
// commands related to caption management.
//
// The CaptionAggregate is designed to work within a larger context of an artwork service, where it interacts
// with other components to maintain a consistent and reliable state of artwork captions. The code includes
// comments and documentation to guide developers through the functionality and usage of the CaptionAggregate.
package caption
import (
"context"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
proto_domainv1 "bleyk.org/genproto/services/artwork/domain/caption/v1"
proto_rsrcv1 "bleyk.org/genproto/services/artwork/resources/v1"
pkg_domain "bleyk.org/pkg/domain"
"bleyk.org/pkg/errors"
domain "bleyk.org/services/artwork/internal/domain"
)
// StreamName represents the stream name for the CaptionAggregate events.
const StreamName = "bleyk.services.artwork.domain.caption.CaptionAggregate"
// CaptionAggregate is an aggregate representing the state of captions for artworks.
type CaptionAggregate struct {
id domain.ArtworkID
registeredByAccount string
registeredByUser string
version int
changes []pkg_domain.EventMessage
// props
uniqueArtworkCaption *proto_rsrcv1.Caption
editionsCaptions []*editionCaption
}
// New creates a new instance of CaptionAggregate.
func New() CaptionAggregate {
return CaptionAggregate{}
}
// FromHistory loads the current aggregate root state by applying all events in order.
func FromHistory(ctx context.Context, events []pkg_domain.EventMessage) (CaptionAggregate, error) {
aggregate := New()
for _, em := range events {
if err := aggregate.transition(ctx, em.GetPayload()); err != nil {
return aggregate, errors.Wrap(ctx, err)
}
aggregate.version++
}
return aggregate, nil
}
// ... (public methods)
// Version returns the current aggregate root version.
func (a CaptionAggregate) Version() int {
return a.version
}
// Changes returns all newly applied events.
func (a CaptionAggregate) Changes() []pkg_domain.EventMessage {
return a.changes
}
// RegisterEditionedArtworkWithCaption registers an editioned artwork with a caption.
func (a *CaptionAggregate) RegisterEditionedArtworkWithCaption(
ctx context.Context,
command *proto_domainv1.RegisterEditionedArtworkWithCaption,
commandVerification domain.CommandVerifier,
eventVerification domain.EventVerifier,
) error {
// verify command before processing
err = commandVerification.VerifyRegisterEditionedArtworkWithCaption(ctx, command)
if err != nil {
return err
}
event := &proto_domainv1.EditionedArtworkRegisteredWithCaption{
AccountId: command.GetAccountId(),
UserId: command.GetUserId(),
ArtworkId: command.GetArtworkId(),
Editions: []*proto_domainv1.EditionedArtworkRegisteredWithCaption_Edition{},
EventTime: timestamppb.Now(),
}
for _, v := range command.GetEditions() {
event.Editions = append(event.Editions, &proto_domainv1.EditionedArtworkRegisteredWithCaption_Edition{
EditionId: v.GetEditionId(),
Type: v.GetType(),
Number: v.GetNumber(),
Caption: v.GetCaption(),
})
}
// verify event before dispatching
err = eventVerification.VerifyArtEventRegistered(ctx, event)
if err != nil {
return err
}
if err := a.transition(ctx, event); err != nil {
return err
}
domainEvent := pkg_domain.NewEventMessageFromProtoMessage(
ctx,
command.GetArtworkId(),
StreamName,
a.version,
event,
)
if err := a.trackChange(ctx, domainEvent); err != nil {
return err
}
return nil
}
// ModifyGranularCaptionDetail modifies the caption details for editioned or unique artwork.
func (a *CaptionAggregate) ModifyGranularCaptionDetail(
ctx context.Context,
command *proto_domainv1.ModifyGranularCaptionDetail,
commandVerification domain.CommandVerifier,
eventVerification domain.EventVerifier,
) error {
// check that we are dealing with the correct artwork
if len(a.editionsCaptions) > 0 && len(command.GetEditionIds()) == 0 {
return domain.ErrWrongArtworkType
}
if len(a.editionsCaptions) == 0 && len(command.GetEditionIds()) > 0 {
return domain.ErrWrongArtworkType
}
// verify command before processing
err = commandVerification.VerifyModifyGranularCaptionDetail(ctx, command)
if err != nil {
return err
}
// Editioned
if len(command.GetEditionIds()) > 0 {
modifiedEditions := []*proto_domainv1.GranularCaptionDetailModified_ModifiedEdition{}
for _, edId := range command.GetEditionIds() {
for _, v := range a.editionsCaptions {
// Check modification if valid modifications
if v.editionID == edId {
isModified, modifiedCaption := getGranularCaptionDetailModified(command.GetModificationPayload(), v.caption)
if isModified {
modifiedEdition := &proto_domainv1.GranularCaptionDetailModified_ModifiedEdition{
EditionId: edId,
ModifiedCaptionDetail: modifiedCaption,
}
modifiedEditions = append(modifiedEditions, modifiedEdition)
}
}
}
}
// only emit event if there are actual modification
if len(modifiedEditions) > 0 {
event := &proto_domainv1.GranularCaptionDetailModified{
AccountId: command.GetAccountId(),
UserId: command.GetUserId(),
ArtworkId: a.id.Value(),
ModifiedArtwork: &proto_domainv1.GranularCaptionDetailModified_ModifiedEditioned{
ModifiedEditioned: &proto_domainv1.GranularCaptionDetailModified_ModifiedEditions{
Editions: modifiedEditions,
},
},
EventTime: timestamppb.Now(),
}
if err := a.transition(ctx, event); err != nil {
return err
}
domainEvent := pkg_domain.NewEventMessageFromProtoMessage(
ctx,
a.id.Value(),
StreamName,
a.version,
event,
)
// verify event before dispatching
err = eventVerification.VerifyGranularCaptionDetailModified(ctx, event)
if err != nil {
return err
}
if err := a.trackChange(ctx, domainEvent); err != nil {
return err
}
}
} else { // Unique
isModified, modifiedCaption := getGranularCaptionDetailModified(command.GetModificationPayload(), a.uniqueArtworkCaption)
// only emit event if there are actual modification
if isModified {
event := &proto_domainv1.GranularCaptionDetailModified{
AccountId: command.GetAccountId(),
UserId: command.GetUserId(),
ArtworkId: a.id.Value(),
ModifiedArtwork: &proto_domainv1.GranularCaptionDetailModified_ModifiedUnique{
ModifiedUnique: modifiedCaption,
},
EventTime: timestamppb.Now(),
}
if err := a.transition(ctx, event); err != nil {
return err
}
domainEvent := pkg_domain.NewEventMessageFromProtoMessage(
ctx,
a.id.Value(),
StreamName,
a.version,
event,
)
// verify event before dispatching
err = eventVerification.VerifyGranularCaptionDetailModified(ctx, event)
if err != nil {
return err
}
if err := a.trackChange(ctx, domainEvent); err != nil {
return err
}
}
}
return nil
}
// transition applies the events to the aggregate to update its state.
func (a *CaptionAggregate) transition(ctx context.Context, payload proto.Message) error {
payloadType := payload.ProtoReflect().Descriptor().FullName()
switch payloadType {
// EditionedArtworkRegisteredWithCaption
case (&proto_domainv1.EditionedArtworkRegisteredWithCaption{}).ProtoReflect().Descriptor().FullName():
event := payload.(*proto_domainv1.EditionedArtworkRegisteredWithCaption)
err := event.ValidateAll()
if err != nil {
return err
}
a.id = domain.ArtworkID(event.GetArtworkId())
a.registeredByAccount = event.GetAccountId()
a.registeredByUser = event.GetUserId()
a.editionsCaptions = []*editionCaption{}
for _, v := range event.GetEditions() {
a.editionsCaptions = append(a.editionsCaptions, &editionCaption{
editionID: v.GetEditionId(),
caption: v.GetCaption(),
})
}
// ... (other event handlers)
// GranularCaptionDetailModified
case (&proto_domainv1.GranularCaptionDetailModified{}).ProtoReflect().Descriptor().FullName():
event := payload.(*proto_domainv1.GranularCaptionDetailModified)
err := event.ValidateAll()
if err != nil {
return err
}
// if editioned artwork
if event.GetModifiedEditioned() != nil && len(event.GetModifiedEditioned().GetEditions()) > 0 {
for _, modifiedEdition := range event.GetModifiedEditioned().GetEditions() {
for _, v := range a.editionsCaptions {
if modifiedEdition.EditionId == v.editionID {
applyModifiedCaptionDetailToCaption(v.caption, modifiedEdition.GetModifiedCaptionDetail())
}
}
}
}
// if unique artwork
if event.GetModifiedUnique() != nil {
modifiedCaptionDetails := event.GetModifiedUnique()
applyModifiedCaptionDetailToCaption(a.uniqueArtworkCaption, modifiedCaptionDetails)
}
// ... (other event handlers)
// UnhandledDomainEventError is returned if there were no handlers for the given event
default:
return &pkg_domain.UnhandledDomainEventError{EventType: string(payloadType)}
}
return nil
}
// trackChange appends a new event to the aggregate's changes.
func (a *CaptionAggregate) trackChange(ctx context.Context, event pkg_domain.EventMessage) error {
a.changes = append(a.changes, event)
a.version++
return nil
}
// editionCaption represents the caption details for an edition of an artwork.
type editionCaption struct {
editionID string
caption *proto_rsrcv1.Caption
}
// getModifiedCaptionOpts defines options for calculating modified caption details.
type getModifiedCaptionOpts struct {
newCaption *proto_rsrcv1.Caption
oldCaption *proto_rsrcv1.Caption
}
// getModifiedCaption calculates if there are modifications in the caption details.
func getModifiedCaption(opts getModifiedCaptionOpts) (bool, *proto_domainv1.ModifiedCaptionDetail) {
var (
newCaption = opts.newCaption
oldCaption = opts.oldCaption
)
isModified := false
modifiedCaption := &proto_domainv1.ModifiedCaptionDetail{}
if newCaption.GetTitle() != oldCaption.GetTitle() {
isModified = true
modifiedCaption.Title = &proto_domainv1.ModifiedCaptionDetail_Title{
Value: newCaption.GetTitle(),
}
}
if newCaption.GetYear() != oldCaption.GetYear() {
isModified = true
modifiedCaption.Year = &proto_domainv1.ModifiedCaptionDetail_Year{
Value: &proto_rsrcv1.ArtworkYear{
Raw: newCaption.GetYear(),
Commenced: newCaption.GetYearCommenced(),
Completed: newCaption.GetYearCompleted(),
},
}
}
if newCaption.GetMedium() != oldCaption.GetMedium() {
isModified = true
modifiedCaption.Medium = &proto_domainv1.ModifiedCaptionDetail_Medium{
Value: newCaption.GetMedium(),
}
}
// ... (check other modification)
return isModified, modifiedCaption
}
// getGranularCaptionDetailModified calculates if there are modifications in granular caption details.
func getGranularCaptionDetailModified(modificationPayload *proto_rsrcv1.GranularCaptionModificationPayload,
caption *proto_rsrcv1.Caption,
) (bool, *proto_domainv1.ModifiedCaptionDetail) {
isModified := false
modifiedCaption := &proto_domainv1.ModifiedCaptionDetail{}
if modificationPayload.GetTitle() != nil && modificationPayload.GetTitle().GetValue() != caption.GetTitle() {
isModified = true
modifiedCaption.Title = &proto_domainv1.ModifiedCaptionDetail_Title{
Value: modificationPayload.GetTitle().GetValue(),
}
}
if modificationPayload.GetYear() != nil && modificationPayload.GetYear().GetValue() != nil &&
(modificationPayload.GetYear().GetValue().GetRaw() != caption.GetYear() ||
modificationPayload.GetYear().GetValue().GetCommenced() != caption.GetYearCommenced() ||
modificationPayload.GetYear().GetValue().GetCompleted() != caption.GetYearCompleted()) {
isModified = true
modifiedCaption.Year = &proto_domainv1.ModifiedCaptionDetail_Year{
Value: &proto_rsrcv1.ArtworkYear{
Raw: modificationPayload.GetYear().GetValue().GetRaw(),
Commenced: modificationPayload.GetYear().GetValue().GetCommenced(),
Completed: modificationPayload.GetYear().GetValue().GetCompleted(),
},
}
}
// ... (check other modification)
return isModified, modifiedCaption
}
// applyModifiedCaptionDetailToCaption applies the modified caption details to the current caption.
func applyModifiedCaptionDetailToCaption(caption *proto_rsrcv1.Caption, modification *proto_domainv1.ModifiedCaptionDetail) {
if modification.GetTitle() != nil && modification.GetTitle().GetValue() != caption.GetTitle() {
caption.Title = modification.GetTitle().GetValue()
}
if modification.GetYear() != nil && modification.GetYear().GetValue() != nil &&
modification.GetYear().GetValue().GetRaw() != caption.GetYear() {
caption.Year = modification.GetYear().GetValue().GetRaw()
caption.YearCommenced = modification.GetYear().GetValue().GetCommenced()
caption.YearCompleted = modification.GetYear().GetValue().GetCompleted()
}
if modification.GetMedium() != nil && modification.GetMedium().GetValue() != caption.GetMedium() {
caption.Medium = modification.GetMedium().GetValue()
}
// ... (apply other props)
}
// Package eventsourcing provides an implementation of the caption repository using event sourcing.
package eventsourcing
import (
"context"
"errors"
pkg_domain "bleyk.org/pkg/domain"
"bleyk.org/pkg/metadata"
"bleyk.org/services/artwork/internal/domain"
domain_caption "bleyk.org/services/artwork/internal/domain/caption"
)
// captionRepository is an implementation of the domain_caption.Repository interface using event sourcing.
type captionRepository struct {
eventStore pkg_domain.EventStore
}
// NewCaptionRepository creates a new caption repository with the given event store.
func NewCaptionRepository(eventStore pkg_domain.EventStore) domain_caption.Repository {
return &captionRepository{
eventStore: eventStore,
}
}
// Save saves changes made to the caption aggregate to the event store within the provided session.
func (r *captionRepository) Save(ctx context.Context, artwork domain_caption.CaptionAggregate, sess pkg_domain.EventStoreSession) error {
// Check if there are changes to be saved.
if len(artwork.Changes()) > 0 {
var meta metadata.EventMessageMetadata
// Iterate through each change in the aggregate's changes.
for _, ev := range artwork.Changes() {
// Retrieve metadata from the context.
meta = metadata.EventMessageMetadataFromContext(ctx)
// If metadata is present, associate it with the event.
if !meta.IsEmpty() {
ev.WithMetadata(&meta)
}
}
}
// Save changes to the event store within the provided session.
return sess.Save(artwork.Changes())
}
// Get retrieves the caption aggregate from the event store based on the provided stream ID.
func (r *captionRepository) Get(ctx context.Context, streamID domain.ArtworkID) (domain_caption.CaptionAggregate, error) {
var agg domain_caption.CaptionAggregate
// Retrieve events from the event store based on stream ID and stream name.
events, err := r.eventStore.GetStream(ctx, streamID.Value(), domain_caption.StreamName)
if err != nil {
// Handle errors, checking if the stream is not found.
switch {
case errors.Is(err, pkg_domain.ErrEventStoreStreamNotFound):
// do we need to modify this?
}
return agg, err
}
// Reconstruct the caption aggregate from the retrieved events.
return domain_caption.FromHistory(ctx, events)
}
/* pkg/domain - EventStore interface*/
package domain
import (
"context"
)
// EventStore represents an interface for an event store responsible for managing domain events.
type EventStore interface {
// NewStoreSession creates a new event store session that holds all events to be committed in a single session.
NewStoreSession() EventStoreSession
// Get retrieves a specific event by ID.
Get(ctx context.Context, id string) (EventMessage, error)
// FindAll retrieves all stored events.
FindAll(ctx context.Context) ([]EventMessage, error)
// GetStream retrieves all events for a specific stream (aggregate) by ID and name.
GetStream(ctx context.Context, streamID string, streamName string) ([]EventMessage, error)
// GetStreamEventsByType retrieves events for a specific stream by type.
GetStreamEventsByType(ctx context.Context, streamID string, streamName string, eventType string) ([]EventMessage, error)
// Commit persists the "stored events" in the session.
Commit(ctx context.Context, session EventStoreSession) error
// SubscribeTransactionalEventHandlers subscribes transactional event handlers for processing events.
SubscribeTransactionalEventHandlers(...TransactionalEventHandler) error
// SubscribeIntegrationEventMessageAdapters subscribes integration event message adapters.
SubscribeIntegrationEventMessageAdapters(adapters ...IntegrationEventMessageAdapter) error
}
// EventStoreSession represents a session for saving events in aggregates.
type EventStoreSession interface {
ID() string // ID returns the session ID.
Save(events []EventMessage) error // Save saves the provided events in the session.
Get() []EventMessage // Get retrieves all events in the session.
IsEmpty() bool // IsEmpty checks if the session is empty.
}
package postgres
// EventStore with transactional support ensures data integrity within the bounded context.
// It facilitates the persistence of projections, read models, etc., within a transaction,
// while the aggregate events are being stored in the events store.
// It is crucial to use event handlers specific to the bounded context. For instance, when
// provisioning a new user, ensure that user aggregate events are processed by the relevant
// handlers before committing them to the event store via the user repository.
// Any failure in either the event handling or persistence of projections will result in the
// failure of both, emphasizing the importance of maintaining data consistency.
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
domain "bleyk.org/pkg/domain"
apperrors "bleyk.org/pkg/errors"
"bleyk.org/pkg/integration"
"bleyk.org/pkg/logger"
"bleyk.org/pkg/metadata"
"bleyk.org/pkg/protobuf"
)
// eventStore is a PostgreSQL implementation of the EventStore interface.
type eventStore struct {
domainEventHandlers []domain.TransactionalEventHandler
integrationEventsAdapters []domain.IntegrationEventMessageAdapter
integrationEventsStore integration.EventStore
queuedIntegrationEventMessages []integration.EventMessage
db *pgxpool.Pool
logger logger.Logger
}
// eventMsgDB is a representation of the event in the database.
type eventMsgDB struct {
id int64
eventID string
// userID string
streamID string
streamName string
streamSeqNo int64
eventType string
payload interface{}
binaryPayload []byte
metadata []byte
eventTime time.Time
}
// NewEventStore creates a new PostgreSQL event store.
func NewEventStore(
integrationEventsStore integration.EventStore,
db *pgxpool.Pool,
logger logger.Logger,
) domain.EventStore {
return &eventStore{
integrationEventsStore: integrationEventsStore,
domainEventHandlers: []domain.TransactionalEventHandler{},
queuedIntegrationEventMessages: []integration.EventMessage{},
db: db,
logger: logger,
}
}
// NewStoreSession creates a new event store session.
func (s *eventStore) NewStoreSession() domain.EventStoreSession {
return NewEventStoreSession()
}
// Commit saves the changes made during the current session to the event store.
func (s *eventStore) Commit(ctx context.Context,
eventstoreSession domain.EventStoreSession,
) error {
var err error
commitLogErrMsg := "commit failed"
if eventstoreSession.IsEmpty() {
s.logger.Warn(ctx, "empty event store session provided")
return domain.ErrEventStoreContextWithoutSession
}
eventsNames := []string{}
for _, em := range eventstoreSession.Get() {
eventsNames = append(eventsNames, string(em.GetPayload().ProtoReflect().Descriptor().FullName()))
}
logFields := []zapcore.Field{
zap.Strings("events", eventsNames),
zap.String("transaction_session", eventstoreSession.ID()),
}
tx, err := s.db.Begin(ctx)
if err != nil {
logFields = append(logFields, []zapcore.Field{
zap.String("reason", "transaction error"),
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())),
}...)
s.logger.Error(ctx, commitLogErrMsg, logFields...)
return apperrors.Wrap(ctx, err)
}
// Rollback is safe to call even if the tx is already closed;
// if the tx commits successfully, this is a no-op
defer tx.Rollback(ctx)
// Run through events
// each event MUST be handled by `transactional handler`
for _, em := range eventstoreSession.Get() {
messageName := em.GetPayload().ProtoReflect().Descriptor().FullName()
// Handle event by transactional event handlers
hasTransHandler := false
for _, h := range s.domainEventHandlers {
if h.Handles(messageName) {
hasTransHandler = true
err = h.Handle(ctx, tx, messageName, em)
if err != nil {
logFields = append(logFields, []zapcore.Field{
zap.String("reason", "domain event handler failed"),
zap.String("event_type", string(messageName)),
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())),
}...)
s.logger.Error(ctx, commitLogErrMsg, logFields...)
return apperrors.Wrap(ctx, err)
}
}
}
// err if no `transactional handler` was provided for the event
if !hasTransHandler {
reason := "domaineventstore: no transactional handler"
logFields = append(logFields, []zapcore.Field{
zap.String("error", reason),
zap.String("event_type", string(messageName)),
}...)
s.logger.Error(ctx, commitLogErrMsg, logFields...)
return apperrors.New(ctx, reason)
}
// adapt domain event to an integration event if an adapter for it is registered
if len(s.integrationEventsAdapters) > 0 {
for _, adapter := range s.integrationEventsAdapters {
if adapter.CanAdapt(em) {
integrationEvent, err := adapter.Adapt(ctx, em)
if err != nil {
logFields = append(logFields, []zapcore.Field{
zap.String("reason", "integration event adapter failed"),
zap.String("event_type", string(messageName)),
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())),
}...)
s.logger.Error(ctx, commitLogErrMsg, logFields...)
return apperrors.Wrap(ctx, err)
}
// Set "queued for publishing" status
integrationEvent.WithStatus(integration.MessageStatus_PUBLISH_QUEUED)
// append message to the queue
s.queuedIntegrationEventMessages = append(s.queuedIntegrationEventMessages, integrationEvent)
}
}
}
}
// persist domain events
if err := s.persistToDB(ctx, tx, eventstoreSession.Get()...); err != nil {
logFields = append(logFields, []zapcore.Field{
zap.String("reason", "failed to persist domain event messages"),
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())),
}...)
s.logger.Error(ctx, commitLogErrMsg, logFields...)
return apperrors.Wrap(ctx, err)
}
// persist integration events
if len(s.queuedIntegrationEventMessages) > 0 {
if err := s.integrationEventsStore.Save(ctx, tx, s.queuedIntegrationEventMessages...); err != nil {
logFields = append(logFields, []zapcore.Field{
zap.String("reason", "failed to persist integration event messages"),
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())),
}...)
s.logger.Error(ctx, commitLogErrMsg, logFields...)
return apperrors.Wrap(ctx, err)
}
}
// Commit tx
err = tx.Commit(ctx)
if err != nil {
logFields = append(logFields, []zapcore.Field{
zap.String("reason", "pg transaction failed"),
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())),
}...)
s.logger.Error(ctx, commitLogErrMsg, logFields...)
return apperrors.Wrap(ctx, err)
}
// once the transaction is commited succesfully, we can publish the integration events (if any)
if len(s.queuedIntegrationEventMessages) > 0 {
s.integrationEventsStore.Publish(ctx, s.queuedIntegrationEventMessages...)
// IMPORTANT! must clear queued integration events from the slice
s.queuedIntegrationEventMessages = []integration.EventMessage{}
}
s.logger.Debug(ctx, "commit completed", logFields...)
return nil
}
// Get retrieves a single event by its ID.
func (s *eventStore) Get(ctx context.Context, id string) (domain.EventMessage, error) {
return nil, nil
}
// FindAll retrieves all events.
func (s *eventStore) FindAll(ctx context.Context) ([]domain.EventMessage, error) {
query := `
SELECT
id,
event_id,
stream_id,
stream_name,
stream_seq_no,
event_type,
payload,
binary_payload,
metadata,
event_time
FROM eventstore.domain_events
ORDER BY
id ASC
LIMIT 1
`
rows, err := s.db.Query(
context.Background(),
query,
)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, domain.ErrEventStoreStreamNotFound
}
s.logger.Error(ctx, "FindAll failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error())))
return nil, apperrors.Wrap(ctx, err)
}
defer rows.Close()
events := []domain.EventMessage{}
// Iterate through the result set
for rows.Next() {
eventMsg := eventMsgDB{}
err = rows.Scan(
&eventMsg.id,
&eventMsg.eventID,
&eventMsg.streamID,
&eventMsg.streamName,
&eventMsg.streamSeqNo,
&eventMsg.eventType,
&eventMsg.payload,
&eventMsg.binaryPayload,
&eventMsg.metadata,
&eventMsg.eventTime,
)
if err != nil {
s.logger.Error(ctx, "FindAll failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error())))
return nil, apperrors.Wrap(ctx, err)
}
pmsg, err := protobuf.ProtoMessageFromBytes(eventMsg.eventType, eventMsg.binaryPayload)
if err != nil {
s.logger.Error(ctx, "FindAll failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error())))
return nil, apperrors.Wrap(ctx, err)
}
em := domain.NewEventMessageFromProtoMessage(ctx, eventMsg.streamID, eventMsg.streamName, int(eventMsg.streamSeqNo), pmsg)
// set time
em.WithEventTime(eventMsg.eventTime)
// set metadata
md, err := metadata.EventMessageMetadataFromPayload(eventMsg.metadata)
if err == nil {
em.WithMetadata(md)
}
events = append(events, em)
}
return events, nil
}
// GetStream retrieves events for a specific stream.
func (s *eventStore) GetStream(ctx context.Context, streamID string, streamName string) ([]domain.EventMessage, error) {
query := `
SELECT
id,
event_id,
stream_id,
stream_name,
stream_seq_no,
event_type,
binary_payload,
metadata,
event_time
FROM eventstore.domain_events
WHERE
stream_id=$1 AND stream_name=$2
ORDER BY
id ASC
`
rows, err := s.db.Query(
context.Background(),
query,
streamID,
streamName,
)
// TODO check error type
if err != nil {
switch {
case errors.Is(err, pgx.ErrNoRows):
return nil, domain.ErrEventStoreStreamNotFound
}
s.logger.Error(ctx, "GetStream failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error())))
return nil, apperrors.Wrap(ctx, err)
}
defer rows.Close()
events := []domain.EventMessage{}
// Iterate through the result set
for rows.Next() {
eventMsg := eventMsgDB{}
err = rows.Scan(
&eventMsg.id,
&eventMsg.eventID,
&eventMsg.streamID,
&eventMsg.streamName,
&eventMsg.streamSeqNo,
&eventMsg.eventType,
// &eventMsg.payload,
&eventMsg.binaryPayload,
&eventMsg.metadata,
&eventMsg.eventTime,
)
// TODO check error
if err != nil {
s.logger.Error(ctx, "GetStream failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error())))
return nil, apperrors.Wrap(ctx, err)
}
pmsg, err := protobuf.ProtoMessageFromBytes(eventMsg.eventType, eventMsg.binaryPayload)
if err != nil {
s.logger.Error(ctx, "GetStream failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error())))
return nil, apperrors.Wrap(ctx, err)
}
em := domain.NewEventMessageFromProtoMessage(ctx, eventMsg.streamID, eventMsg.streamName, int(eventMsg.streamSeqNo), pmsg)
// set time
em.WithEventTime(eventMsg.eventTime)
// set metadata
md, err := metadata.EventMessageMetadataFromPayload(eventMsg.metadata)
if err == nil {
em.WithMetadata(md)
}
events = append(events, em)
}
if len(events) == 0 {
return nil, domain.ErrEventStoreStreamNotFound
}
return events, nil
}
// GetStreamEventsByType retrieves events of a specific type for a stream.
func (s *eventStore) GetStreamEventsByType(ctx context.Context, streamID string, streamName string, eventType string) ([]domain.EventMessage, error) {
return nil, nil
}
// SubscribeTransactionalEventHandlers subscribes transactional event handlers.
func (s *eventStore) SubscribeTransactionalEventHandlers(handlers ...domain.TransactionalEventHandler) error {
for _, handler := range handlers {
// check if it was already susbcribed
alreadyExists := false
//
for _, v := range s.domainEventHandlers {
if v.Name() == handler.Name() {
s.logger.Warn(context.TODO(),
fmt.Sprintf("transactional handler '%s' already subscribed", v.Name()),
)
}
}
if !alreadyExists {
s.domainEventHandlers = append(s.domainEventHandlers, handler)
}
}
return nil
}
// SubscribeIntegrationEventMessageAdapters subscribes integration event message adapters.
func (s *eventStore) SubscribeIntegrationEventMessageAdapters(adapters ...domain.IntegrationEventMessageAdapter) error {
s.integrationEventsAdapters = append(s.integrationEventsAdapters, adapters...)
// check if duplicates, then ...
// for _, adapter := range adapters {
// s.integrationEventsAdapters = append(s.integrationEventsAdapters, adapter)
// }
return nil
}
// HasAccountId is an interface used to check if the event payload message has an account ID.
type HasAccountId interface {
GetAccountId() string
}
// persistToDB persists events to the PostgreSQL database.
func (s *eventStore) persistToDB(ctx context.Context, tx pgx.Tx, eventMessages ...domain.EventMessage) error {
rows := [][]interface{}{}
if len(eventMessages) == 0 {
s.logger.Warn(ctx, "persit called with no events to persist",
s.logger.WithStackTrace("stacktrace"),
)
return nil
}
// for each event
for _, event := range eventMessages {
binPayload, err := proto.Marshal(event.GetPayload())
if err != nil {
return apperrors.Wrap(ctx, err)
}
metadata, err := json.Marshal(event.GetMetadata())
if err != nil {
return apperrors.Wrap(ctx, err)
}
// t, ok := (event.GetPayload()).(WithAccountId)
// if ok {
// util.DebugJSON(t.GetAccountId())
// }
rows = append(rows, []interface{}{
event.GetId(),
event.GetStreamId(),
event.GetStreamName(),
event.GetStreamSeqenceNo(),
event.GetEventType(),
event.GetPayload(),
binPayload,
metadata,
event.GetEventTime().UTC(),
})
}
tableIdentifier := pgx.Identifier{
"eventstore", "domain_events",
}
// copyCount
_, err := tx.CopyFrom(
ctx,
tableIdentifier,
[]string{
"event_id",
"stream_id",
"stream_name",
"stream_seq_no",
"event_type",
"payload",
"binary_payload",
"metadata",
"event_time",
},
pgx.CopyFromRows(rows),
)
return err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment