Last active
December 12, 2023 01:41
-
-
Save SpareShade/d38ecc0202a87ec7439a00cf0d0f2577 to your computer and use it in GitHub Desktop.
GetUp - Event Sourcing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package caption provides the implementation of the CaptionAggregate, an aggregate representing the state | |
// of captions for artworks within the artwork domain. This package handles the registration, modification, | |
// and tracking of caption details for both unique and editioned artworks. The CaptionAggregate ensures data | |
// integrity by applying events in a specific order and emitting new events for changes in caption details. | |
// It also includes functions for transitioning the aggregate state, tracking changes, and handling various | |
// commands related to caption management. | |
// | |
// The CaptionAggregate is designed to work within a larger context of an artwork service, where it interacts | |
// with other components to maintain a consistent and reliable state of artwork captions. The code includes | |
// comments and documentation to guide developers through the functionality and usage of the CaptionAggregate. | |
package caption | |
import ( | |
"context" | |
"google.golang.org/protobuf/proto" | |
"google.golang.org/protobuf/types/known/timestamppb" | |
proto_domainv1 "bleyk.org/genproto/services/artwork/domain/caption/v1" | |
proto_rsrcv1 "bleyk.org/genproto/services/artwork/resources/v1" | |
pkg_domain "bleyk.org/pkg/domain" | |
"bleyk.org/pkg/errors" | |
domain "bleyk.org/services/artwork/internal/domain" | |
) | |
// StreamName represents the stream name for the CaptionAggregate events. | |
const StreamName = "bleyk.services.artwork.domain.caption.CaptionAggregate" | |
// CaptionAggregate is an aggregate representing the state of captions for artworks. | |
type CaptionAggregate struct { | |
id domain.ArtworkID | |
registeredByAccount string | |
registeredByUser string | |
version int | |
changes []pkg_domain.EventMessage | |
// props | |
uniqueArtworkCaption *proto_rsrcv1.Caption | |
editionsCaptions []*editionCaption | |
} | |
// New creates a new instance of CaptionAggregate. | |
func New() CaptionAggregate { | |
return CaptionAggregate{} | |
} | |
// FromHistory loads the current aggregate root state by applying all events in order. | |
func FromHistory(ctx context.Context, events []pkg_domain.EventMessage) (CaptionAggregate, error) { | |
aggregate := New() | |
for _, em := range events { | |
if err := aggregate.transition(ctx, em.GetPayload()); err != nil { | |
return aggregate, errors.Wrap(ctx, err) | |
} | |
aggregate.version++ | |
} | |
return aggregate, nil | |
} | |
// ... (public methods) | |
// Version returns the current aggregate root version. | |
func (a CaptionAggregate) Version() int { | |
return a.version | |
} | |
// Changes returns all newly applied events. | |
func (a CaptionAggregate) Changes() []pkg_domain.EventMessage { | |
return a.changes | |
} | |
// RegisterEditionedArtworkWithCaption registers an editioned artwork with a caption. | |
func (a *CaptionAggregate) RegisterEditionedArtworkWithCaption( | |
ctx context.Context, | |
command *proto_domainv1.RegisterEditionedArtworkWithCaption, | |
commandVerification domain.CommandVerifier, | |
eventVerification domain.EventVerifier, | |
) error { | |
// verify command before processing | |
err = commandVerification.VerifyRegisterEditionedArtworkWithCaption(ctx, command) | |
if err != nil { | |
return err | |
} | |
event := &proto_domainv1.EditionedArtworkRegisteredWithCaption{ | |
AccountId: command.GetAccountId(), | |
UserId: command.GetUserId(), | |
ArtworkId: command.GetArtworkId(), | |
Editions: []*proto_domainv1.EditionedArtworkRegisteredWithCaption_Edition{}, | |
EventTime: timestamppb.Now(), | |
} | |
for _, v := range command.GetEditions() { | |
event.Editions = append(event.Editions, &proto_domainv1.EditionedArtworkRegisteredWithCaption_Edition{ | |
EditionId: v.GetEditionId(), | |
Type: v.GetType(), | |
Number: v.GetNumber(), | |
Caption: v.GetCaption(), | |
}) | |
} | |
// verify event before dispatching | |
err = eventVerification.VerifyArtEventRegistered(ctx, event) | |
if err != nil { | |
return err | |
} | |
if err := a.transition(ctx, event); err != nil { | |
return err | |
} | |
domainEvent := pkg_domain.NewEventMessageFromProtoMessage( | |
ctx, | |
command.GetArtworkId(), | |
StreamName, | |
a.version, | |
event, | |
) | |
if err := a.trackChange(ctx, domainEvent); err != nil { | |
return err | |
} | |
return nil | |
} | |
// ModifyGranularCaptionDetail modifies the caption details for editioned or unique artwork. | |
func (a *CaptionAggregate) ModifyGranularCaptionDetail( | |
ctx context.Context, | |
command *proto_domainv1.ModifyGranularCaptionDetail, | |
commandVerification domain.CommandVerifier, | |
eventVerification domain.EventVerifier, | |
) error { | |
// check that we are dealing with the correct artwork | |
if len(a.editionsCaptions) > 0 && len(command.GetEditionIds()) == 0 { | |
return domain.ErrWrongArtworkType | |
} | |
if len(a.editionsCaptions) == 0 && len(command.GetEditionIds()) > 0 { | |
return domain.ErrWrongArtworkType | |
} | |
// verify command before processing | |
err = commandVerification.VerifyModifyGranularCaptionDetail(ctx, command) | |
if err != nil { | |
return err | |
} | |
// Editioned | |
if len(command.GetEditionIds()) > 0 { | |
modifiedEditions := []*proto_domainv1.GranularCaptionDetailModified_ModifiedEdition{} | |
for _, edId := range command.GetEditionIds() { | |
for _, v := range a.editionsCaptions { | |
// Check modification if valid modifications | |
if v.editionID == edId { | |
isModified, modifiedCaption := getGranularCaptionDetailModified(command.GetModificationPayload(), v.caption) | |
if isModified { | |
modifiedEdition := &proto_domainv1.GranularCaptionDetailModified_ModifiedEdition{ | |
EditionId: edId, | |
ModifiedCaptionDetail: modifiedCaption, | |
} | |
modifiedEditions = append(modifiedEditions, modifiedEdition) | |
} | |
} | |
} | |
} | |
// only emit event if there are actual modification | |
if len(modifiedEditions) > 0 { | |
event := &proto_domainv1.GranularCaptionDetailModified{ | |
AccountId: command.GetAccountId(), | |
UserId: command.GetUserId(), | |
ArtworkId: a.id.Value(), | |
ModifiedArtwork: &proto_domainv1.GranularCaptionDetailModified_ModifiedEditioned{ | |
ModifiedEditioned: &proto_domainv1.GranularCaptionDetailModified_ModifiedEditions{ | |
Editions: modifiedEditions, | |
}, | |
}, | |
EventTime: timestamppb.Now(), | |
} | |
if err := a.transition(ctx, event); err != nil { | |
return err | |
} | |
domainEvent := pkg_domain.NewEventMessageFromProtoMessage( | |
ctx, | |
a.id.Value(), | |
StreamName, | |
a.version, | |
event, | |
) | |
// verify event before dispatching | |
err = eventVerification.VerifyGranularCaptionDetailModified(ctx, event) | |
if err != nil { | |
return err | |
} | |
if err := a.trackChange(ctx, domainEvent); err != nil { | |
return err | |
} | |
} | |
} else { // Unique | |
isModified, modifiedCaption := getGranularCaptionDetailModified(command.GetModificationPayload(), a.uniqueArtworkCaption) | |
// only emit event if there are actual modification | |
if isModified { | |
event := &proto_domainv1.GranularCaptionDetailModified{ | |
AccountId: command.GetAccountId(), | |
UserId: command.GetUserId(), | |
ArtworkId: a.id.Value(), | |
ModifiedArtwork: &proto_domainv1.GranularCaptionDetailModified_ModifiedUnique{ | |
ModifiedUnique: modifiedCaption, | |
}, | |
EventTime: timestamppb.Now(), | |
} | |
if err := a.transition(ctx, event); err != nil { | |
return err | |
} | |
domainEvent := pkg_domain.NewEventMessageFromProtoMessage( | |
ctx, | |
a.id.Value(), | |
StreamName, | |
a.version, | |
event, | |
) | |
// verify event before dispatching | |
err = eventVerification.VerifyGranularCaptionDetailModified(ctx, event) | |
if err != nil { | |
return err | |
} | |
if err := a.trackChange(ctx, domainEvent); err != nil { | |
return err | |
} | |
} | |
} | |
return nil | |
} | |
// transition applies the events to the aggregate to update its state. | |
func (a *CaptionAggregate) transition(ctx context.Context, payload proto.Message) error { | |
payloadType := payload.ProtoReflect().Descriptor().FullName() | |
switch payloadType { | |
// EditionedArtworkRegisteredWithCaption | |
case (&proto_domainv1.EditionedArtworkRegisteredWithCaption{}).ProtoReflect().Descriptor().FullName(): | |
event := payload.(*proto_domainv1.EditionedArtworkRegisteredWithCaption) | |
err := event.ValidateAll() | |
if err != nil { | |
return err | |
} | |
a.id = domain.ArtworkID(event.GetArtworkId()) | |
a.registeredByAccount = event.GetAccountId() | |
a.registeredByUser = event.GetUserId() | |
a.editionsCaptions = []*editionCaption{} | |
for _, v := range event.GetEditions() { | |
a.editionsCaptions = append(a.editionsCaptions, &editionCaption{ | |
editionID: v.GetEditionId(), | |
caption: v.GetCaption(), | |
}) | |
} | |
// ... (other event handlers) | |
// GranularCaptionDetailModified | |
case (&proto_domainv1.GranularCaptionDetailModified{}).ProtoReflect().Descriptor().FullName(): | |
event := payload.(*proto_domainv1.GranularCaptionDetailModified) | |
err := event.ValidateAll() | |
if err != nil { | |
return err | |
} | |
// if editioned artwork | |
if event.GetModifiedEditioned() != nil && len(event.GetModifiedEditioned().GetEditions()) > 0 { | |
for _, modifiedEdition := range event.GetModifiedEditioned().GetEditions() { | |
for _, v := range a.editionsCaptions { | |
if modifiedEdition.EditionId == v.editionID { | |
applyModifiedCaptionDetailToCaption(v.caption, modifiedEdition.GetModifiedCaptionDetail()) | |
} | |
} | |
} | |
} | |
// if unique artwork | |
if event.GetModifiedUnique() != nil { | |
modifiedCaptionDetails := event.GetModifiedUnique() | |
applyModifiedCaptionDetailToCaption(a.uniqueArtworkCaption, modifiedCaptionDetails) | |
} | |
// ... (other event handlers) | |
// UnhandledDomainEventError is returned if there were no handlers for the given event | |
default: | |
return &pkg_domain.UnhandledDomainEventError{EventType: string(payloadType)} | |
} | |
return nil | |
} | |
// trackChange appends a new event to the aggregate's changes. | |
func (a *CaptionAggregate) trackChange(ctx context.Context, event pkg_domain.EventMessage) error { | |
a.changes = append(a.changes, event) | |
a.version++ | |
return nil | |
} | |
// editionCaption represents the caption details for an edition of an artwork. | |
type editionCaption struct { | |
editionID string | |
caption *proto_rsrcv1.Caption | |
} | |
// getModifiedCaptionOpts defines options for calculating modified caption details. | |
type getModifiedCaptionOpts struct { | |
newCaption *proto_rsrcv1.Caption | |
oldCaption *proto_rsrcv1.Caption | |
} | |
// getModifiedCaption calculates if there are modifications in the caption details. | |
func getModifiedCaption(opts getModifiedCaptionOpts) (bool, *proto_domainv1.ModifiedCaptionDetail) { | |
var ( | |
newCaption = opts.newCaption | |
oldCaption = opts.oldCaption | |
) | |
isModified := false | |
modifiedCaption := &proto_domainv1.ModifiedCaptionDetail{} | |
if newCaption.GetTitle() != oldCaption.GetTitle() { | |
isModified = true | |
modifiedCaption.Title = &proto_domainv1.ModifiedCaptionDetail_Title{ | |
Value: newCaption.GetTitle(), | |
} | |
} | |
if newCaption.GetYear() != oldCaption.GetYear() { | |
isModified = true | |
modifiedCaption.Year = &proto_domainv1.ModifiedCaptionDetail_Year{ | |
Value: &proto_rsrcv1.ArtworkYear{ | |
Raw: newCaption.GetYear(), | |
Commenced: newCaption.GetYearCommenced(), | |
Completed: newCaption.GetYearCompleted(), | |
}, | |
} | |
} | |
if newCaption.GetMedium() != oldCaption.GetMedium() { | |
isModified = true | |
modifiedCaption.Medium = &proto_domainv1.ModifiedCaptionDetail_Medium{ | |
Value: newCaption.GetMedium(), | |
} | |
} | |
// ... (check other modification) | |
return isModified, modifiedCaption | |
} | |
// getGranularCaptionDetailModified calculates if there are modifications in granular caption details. | |
func getGranularCaptionDetailModified(modificationPayload *proto_rsrcv1.GranularCaptionModificationPayload, | |
caption *proto_rsrcv1.Caption, | |
) (bool, *proto_domainv1.ModifiedCaptionDetail) { | |
isModified := false | |
modifiedCaption := &proto_domainv1.ModifiedCaptionDetail{} | |
if modificationPayload.GetTitle() != nil && modificationPayload.GetTitle().GetValue() != caption.GetTitle() { | |
isModified = true | |
modifiedCaption.Title = &proto_domainv1.ModifiedCaptionDetail_Title{ | |
Value: modificationPayload.GetTitle().GetValue(), | |
} | |
} | |
if modificationPayload.GetYear() != nil && modificationPayload.GetYear().GetValue() != nil && | |
(modificationPayload.GetYear().GetValue().GetRaw() != caption.GetYear() || | |
modificationPayload.GetYear().GetValue().GetCommenced() != caption.GetYearCommenced() || | |
modificationPayload.GetYear().GetValue().GetCompleted() != caption.GetYearCompleted()) { | |
isModified = true | |
modifiedCaption.Year = &proto_domainv1.ModifiedCaptionDetail_Year{ | |
Value: &proto_rsrcv1.ArtworkYear{ | |
Raw: modificationPayload.GetYear().GetValue().GetRaw(), | |
Commenced: modificationPayload.GetYear().GetValue().GetCommenced(), | |
Completed: modificationPayload.GetYear().GetValue().GetCompleted(), | |
}, | |
} | |
} | |
// ... (check other modification) | |
return isModified, modifiedCaption | |
} | |
// applyModifiedCaptionDetailToCaption applies the modified caption details to the current caption. | |
func applyModifiedCaptionDetailToCaption(caption *proto_rsrcv1.Caption, modification *proto_domainv1.ModifiedCaptionDetail) { | |
if modification.GetTitle() != nil && modification.GetTitle().GetValue() != caption.GetTitle() { | |
caption.Title = modification.GetTitle().GetValue() | |
} | |
if modification.GetYear() != nil && modification.GetYear().GetValue() != nil && | |
modification.GetYear().GetValue().GetRaw() != caption.GetYear() { | |
caption.Year = modification.GetYear().GetValue().GetRaw() | |
caption.YearCommenced = modification.GetYear().GetValue().GetCommenced() | |
caption.YearCompleted = modification.GetYear().GetValue().GetCompleted() | |
} | |
if modification.GetMedium() != nil && modification.GetMedium().GetValue() != caption.GetMedium() { | |
caption.Medium = modification.GetMedium().GetValue() | |
} | |
// ... (apply other props) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package eventsourcing provides an implementation of the caption repository using event sourcing. | |
package eventsourcing | |
import ( | |
"context" | |
"errors" | |
pkg_domain "bleyk.org/pkg/domain" | |
"bleyk.org/pkg/metadata" | |
"bleyk.org/services/artwork/internal/domain" | |
domain_caption "bleyk.org/services/artwork/internal/domain/caption" | |
) | |
// captionRepository is an implementation of the domain_caption.Repository interface using event sourcing. | |
type captionRepository struct { | |
eventStore pkg_domain.EventStore | |
} | |
// NewCaptionRepository creates a new caption repository with the given event store. | |
func NewCaptionRepository(eventStore pkg_domain.EventStore) domain_caption.Repository { | |
return &captionRepository{ | |
eventStore: eventStore, | |
} | |
} | |
// Save saves changes made to the caption aggregate to the event store within the provided session. | |
func (r *captionRepository) Save(ctx context.Context, artwork domain_caption.CaptionAggregate, sess pkg_domain.EventStoreSession) error { | |
// Check if there are changes to be saved. | |
if len(artwork.Changes()) > 0 { | |
var meta metadata.EventMessageMetadata | |
// Iterate through each change in the aggregate's changes. | |
for _, ev := range artwork.Changes() { | |
// Retrieve metadata from the context. | |
meta = metadata.EventMessageMetadataFromContext(ctx) | |
// If metadata is present, associate it with the event. | |
if !meta.IsEmpty() { | |
ev.WithMetadata(&meta) | |
} | |
} | |
} | |
// Save changes to the event store within the provided session. | |
return sess.Save(artwork.Changes()) | |
} | |
// Get retrieves the caption aggregate from the event store based on the provided stream ID. | |
func (r *captionRepository) Get(ctx context.Context, streamID domain.ArtworkID) (domain_caption.CaptionAggregate, error) { | |
var agg domain_caption.CaptionAggregate | |
// Retrieve events from the event store based on stream ID and stream name. | |
events, err := r.eventStore.GetStream(ctx, streamID.Value(), domain_caption.StreamName) | |
if err != nil { | |
// Handle errors, checking if the stream is not found. | |
switch { | |
case errors.Is(err, pkg_domain.ErrEventStoreStreamNotFound): | |
// do we need to modify this? | |
} | |
return agg, err | |
} | |
// Reconstruct the caption aggregate from the retrieved events. | |
return domain_caption.FromHistory(ctx, events) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* pkg/domain - EventStore interface*/ | |
package domain | |
import ( | |
"context" | |
) | |
// EventStore represents an interface for an event store responsible for managing domain events. | |
type EventStore interface { | |
// NewStoreSession creates a new event store session that holds all events to be committed in a single session. | |
NewStoreSession() EventStoreSession | |
// Get retrieves a specific event by ID. | |
Get(ctx context.Context, id string) (EventMessage, error) | |
// FindAll retrieves all stored events. | |
FindAll(ctx context.Context) ([]EventMessage, error) | |
// GetStream retrieves all events for a specific stream (aggregate) by ID and name. | |
GetStream(ctx context.Context, streamID string, streamName string) ([]EventMessage, error) | |
// GetStreamEventsByType retrieves events for a specific stream by type. | |
GetStreamEventsByType(ctx context.Context, streamID string, streamName string, eventType string) ([]EventMessage, error) | |
// Commit persists the "stored events" in the session. | |
Commit(ctx context.Context, session EventStoreSession) error | |
// SubscribeTransactionalEventHandlers subscribes transactional event handlers for processing events. | |
SubscribeTransactionalEventHandlers(...TransactionalEventHandler) error | |
// SubscribeIntegrationEventMessageAdapters subscribes integration event message adapters. | |
SubscribeIntegrationEventMessageAdapters(adapters ...IntegrationEventMessageAdapter) error | |
} | |
// EventStoreSession represents a session for saving events in aggregates. | |
type EventStoreSession interface { | |
ID() string // ID returns the session ID. | |
Save(events []EventMessage) error // Save saves the provided events in the session. | |
Get() []EventMessage // Get retrieves all events in the session. | |
IsEmpty() bool // IsEmpty checks if the session is empty. | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package postgres | |
// EventStore with transactional support ensures data integrity within the bounded context. | |
// It facilitates the persistence of projections, read models, etc., within a transaction, | |
// while the aggregate events are being stored in the events store. | |
// It is crucial to use event handlers specific to the bounded context. For instance, when | |
// provisioning a new user, ensure that user aggregate events are processed by the relevant | |
// handlers before committing them to the event store via the user repository. | |
// Any failure in either the event handling or persistence of projections will result in the | |
// failure of both, emphasizing the importance of maintaining data consistency. | |
import ( | |
"context" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"time" | |
"github.com/jackc/pgx/v4" | |
"github.com/jackc/pgx/v4/pgxpool" | |
"go.uber.org/zap" | |
"go.uber.org/zap/zapcore" | |
"google.golang.org/protobuf/proto" | |
domain "bleyk.org/pkg/domain" | |
apperrors "bleyk.org/pkg/errors" | |
"bleyk.org/pkg/integration" | |
"bleyk.org/pkg/logger" | |
"bleyk.org/pkg/metadata" | |
"bleyk.org/pkg/protobuf" | |
) | |
// eventStore is a PostgreSQL implementation of the EventStore interface. | |
type eventStore struct { | |
domainEventHandlers []domain.TransactionalEventHandler | |
integrationEventsAdapters []domain.IntegrationEventMessageAdapter | |
integrationEventsStore integration.EventStore | |
queuedIntegrationEventMessages []integration.EventMessage | |
db *pgxpool.Pool | |
logger logger.Logger | |
} | |
// eventMsgDB is a representation of the event in the database. | |
type eventMsgDB struct { | |
id int64 | |
eventID string | |
// userID string | |
streamID string | |
streamName string | |
streamSeqNo int64 | |
eventType string | |
payload interface{} | |
binaryPayload []byte | |
metadata []byte | |
eventTime time.Time | |
} | |
// NewEventStore creates a new PostgreSQL event store. | |
func NewEventStore( | |
integrationEventsStore integration.EventStore, | |
db *pgxpool.Pool, | |
logger logger.Logger, | |
) domain.EventStore { | |
return &eventStore{ | |
integrationEventsStore: integrationEventsStore, | |
domainEventHandlers: []domain.TransactionalEventHandler{}, | |
queuedIntegrationEventMessages: []integration.EventMessage{}, | |
db: db, | |
logger: logger, | |
} | |
} | |
// NewStoreSession creates a new event store session. | |
func (s *eventStore) NewStoreSession() domain.EventStoreSession { | |
return NewEventStoreSession() | |
} | |
// Commit saves the changes made during the current session to the event store. | |
func (s *eventStore) Commit(ctx context.Context, | |
eventstoreSession domain.EventStoreSession, | |
) error { | |
var err error | |
commitLogErrMsg := "commit failed" | |
if eventstoreSession.IsEmpty() { | |
s.logger.Warn(ctx, "empty event store session provided") | |
return domain.ErrEventStoreContextWithoutSession | |
} | |
eventsNames := []string{} | |
for _, em := range eventstoreSession.Get() { | |
eventsNames = append(eventsNames, string(em.GetPayload().ProtoReflect().Descriptor().FullName())) | |
} | |
logFields := []zapcore.Field{ | |
zap.Strings("events", eventsNames), | |
zap.String("transaction_session", eventstoreSession.ID()), | |
} | |
tx, err := s.db.Begin(ctx) | |
if err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "transaction error"), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
// Rollback is safe to call even if the tx is already closed; | |
// if the tx commits successfully, this is a no-op | |
defer tx.Rollback(ctx) | |
// Run through events | |
// each event MUST be handled by `transactional handler` | |
for _, em := range eventstoreSession.Get() { | |
messageName := em.GetPayload().ProtoReflect().Descriptor().FullName() | |
// Handle event by transactional event handlers | |
hasTransHandler := false | |
for _, h := range s.domainEventHandlers { | |
if h.Handles(messageName) { | |
hasTransHandler = true | |
err = h.Handle(ctx, tx, messageName, em) | |
if err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "domain event handler failed"), | |
zap.String("event_type", string(messageName)), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
} | |
} | |
// err if no `transactional handler` was provided for the event | |
if !hasTransHandler { | |
reason := "domaineventstore: no transactional handler" | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("error", reason), | |
zap.String("event_type", string(messageName)), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.New(ctx, reason) | |
} | |
// adapt domain event to an integration event if an adapter for it is registered | |
if len(s.integrationEventsAdapters) > 0 { | |
for _, adapter := range s.integrationEventsAdapters { | |
if adapter.CanAdapt(em) { | |
integrationEvent, err := adapter.Adapt(ctx, em) | |
if err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "integration event adapter failed"), | |
zap.String("event_type", string(messageName)), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
// Set "queued for publishing" status | |
integrationEvent.WithStatus(integration.MessageStatus_PUBLISH_QUEUED) | |
// append message to the queue | |
s.queuedIntegrationEventMessages = append(s.queuedIntegrationEventMessages, integrationEvent) | |
} | |
} | |
} | |
} | |
// persist domain events | |
if err := s.persistToDB(ctx, tx, eventstoreSession.Get()...); err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "failed to persist domain event messages"), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
// persist integration events | |
if len(s.queuedIntegrationEventMessages) > 0 { | |
if err := s.integrationEventsStore.Save(ctx, tx, s.queuedIntegrationEventMessages...); err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "failed to persist integration event messages"), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
} | |
// Commit tx | |
err = tx.Commit(ctx) | |
if err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "pg transaction failed"), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
// once the transaction is commited succesfully, we can publish the integration events (if any) | |
if len(s.queuedIntegrationEventMessages) > 0 { | |
s.integrationEventsStore.Publish(ctx, s.queuedIntegrationEventMessages...) | |
// IMPORTANT! must clear queued integration events from the slice | |
s.queuedIntegrationEventMessages = []integration.EventMessage{} | |
} | |
s.logger.Debug(ctx, "commit completed", logFields...) | |
return nil | |
} | |
// Get retrieves a single event by its ID. | |
func (s *eventStore) Get(ctx context.Context, id string) (domain.EventMessage, error) { | |
return nil, nil | |
} | |
// FindAll retrieves all events. | |
func (s *eventStore) FindAll(ctx context.Context) ([]domain.EventMessage, error) { | |
query := ` | |
SELECT | |
id, | |
event_id, | |
stream_id, | |
stream_name, | |
stream_seq_no, | |
event_type, | |
payload, | |
binary_payload, | |
metadata, | |
event_time | |
FROM eventstore.domain_events | |
ORDER BY | |
id ASC | |
LIMIT 1 | |
` | |
rows, err := s.db.Query( | |
context.Background(), | |
query, | |
) | |
if err != nil { | |
if errors.Is(err, pgx.ErrNoRows) { | |
return nil, domain.ErrEventStoreStreamNotFound | |
} | |
s.logger.Error(ctx, "FindAll failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
defer rows.Close() | |
events := []domain.EventMessage{} | |
// Iterate through the result set | |
for rows.Next() { | |
eventMsg := eventMsgDB{} | |
err = rows.Scan( | |
&eventMsg.id, | |
&eventMsg.eventID, | |
&eventMsg.streamID, | |
&eventMsg.streamName, | |
&eventMsg.streamSeqNo, | |
&eventMsg.eventType, | |
&eventMsg.payload, | |
&eventMsg.binaryPayload, | |
&eventMsg.metadata, | |
&eventMsg.eventTime, | |
) | |
if err != nil { | |
s.logger.Error(ctx, "FindAll failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
pmsg, err := protobuf.ProtoMessageFromBytes(eventMsg.eventType, eventMsg.binaryPayload) | |
if err != nil { | |
s.logger.Error(ctx, "FindAll failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
em := domain.NewEventMessageFromProtoMessage(ctx, eventMsg.streamID, eventMsg.streamName, int(eventMsg.streamSeqNo), pmsg) | |
// set time | |
em.WithEventTime(eventMsg.eventTime) | |
// set metadata | |
md, err := metadata.EventMessageMetadataFromPayload(eventMsg.metadata) | |
if err == nil { | |
em.WithMetadata(md) | |
} | |
events = append(events, em) | |
} | |
return events, nil | |
} | |
// GetStream retrieves events for a specific stream. | |
func (s *eventStore) GetStream(ctx context.Context, streamID string, streamName string) ([]domain.EventMessage, error) { | |
query := ` | |
SELECT | |
id, | |
event_id, | |
stream_id, | |
stream_name, | |
stream_seq_no, | |
event_type, | |
binary_payload, | |
metadata, | |
event_time | |
FROM eventstore.domain_events | |
WHERE | |
stream_id=$1 AND stream_name=$2 | |
ORDER BY | |
id ASC | |
` | |
rows, err := s.db.Query( | |
context.Background(), | |
query, | |
streamID, | |
streamName, | |
) | |
// TODO check error type | |
if err != nil { | |
switch { | |
case errors.Is(err, pgx.ErrNoRows): | |
return nil, domain.ErrEventStoreStreamNotFound | |
} | |
s.logger.Error(ctx, "GetStream failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
defer rows.Close() | |
events := []domain.EventMessage{} | |
// Iterate through the result set | |
for rows.Next() { | |
eventMsg := eventMsgDB{} | |
err = rows.Scan( | |
&eventMsg.id, | |
&eventMsg.eventID, | |
&eventMsg.streamID, | |
&eventMsg.streamName, | |
&eventMsg.streamSeqNo, | |
&eventMsg.eventType, | |
// &eventMsg.payload, | |
&eventMsg.binaryPayload, | |
&eventMsg.metadata, | |
&eventMsg.eventTime, | |
) | |
// TODO check error | |
if err != nil { | |
s.logger.Error(ctx, "GetStream failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
pmsg, err := protobuf.ProtoMessageFromBytes(eventMsg.eventType, eventMsg.binaryPayload) | |
if err != nil { | |
s.logger.Error(ctx, "GetStream failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
em := domain.NewEventMessageFromProtoMessage(ctx, eventMsg.streamID, eventMsg.streamName, int(eventMsg.streamSeqNo), pmsg) | |
// set time | |
em.WithEventTime(eventMsg.eventTime) | |
// set metadata | |
md, err := metadata.EventMessageMetadataFromPayload(eventMsg.metadata) | |
if err == nil { | |
em.WithMetadata(md) | |
} | |
events = append(events, em) | |
} | |
if len(events) == 0 { | |
return nil, domain.ErrEventStoreStreamNotFound | |
} | |
return events, nil | |
} | |
// GetStreamEventsByType retrieves events of a specific type for a stream. | |
func (s *eventStore) GetStreamEventsByType(ctx context.Context, streamID string, streamName string, eventType string) ([]domain.EventMessage, error) { | |
return nil, nil | |
} | |
// SubscribeTransactionalEventHandlers subscribes transactional event handlers. | |
func (s *eventStore) SubscribeTransactionalEventHandlers(handlers ...domain.TransactionalEventHandler) error { | |
for _, handler := range handlers { | |
// check if it was already susbcribed | |
alreadyExists := false | |
// | |
for _, v := range s.domainEventHandlers { | |
if v.Name() == handler.Name() { | |
s.logger.Warn(context.TODO(), | |
fmt.Sprintf("transactional handler '%s' already subscribed", v.Name()), | |
) | |
} | |
} | |
if !alreadyExists { | |
s.domainEventHandlers = append(s.domainEventHandlers, handler) | |
} | |
} | |
return nil | |
} | |
// SubscribeIntegrationEventMessageAdapters subscribes integration event message adapters. | |
func (s *eventStore) SubscribeIntegrationEventMessageAdapters(adapters ...domain.IntegrationEventMessageAdapter) error { | |
s.integrationEventsAdapters = append(s.integrationEventsAdapters, adapters...) | |
// check if duplicates, then ... | |
// for _, adapter := range adapters { | |
// s.integrationEventsAdapters = append(s.integrationEventsAdapters, adapter) | |
// } | |
return nil | |
} | |
// HasAccountId is an interface used to check if the event payload message has an account ID. | |
type HasAccountId interface { | |
GetAccountId() string | |
} | |
// persistToDB persists events to the PostgreSQL database. | |
func (s *eventStore) persistToDB(ctx context.Context, tx pgx.Tx, eventMessages ...domain.EventMessage) error { | |
rows := [][]interface{}{} | |
if len(eventMessages) == 0 { | |
s.logger.Warn(ctx, "persit called with no events to persist", | |
s.logger.WithStackTrace("stacktrace"), | |
) | |
return nil | |
} | |
// for each event | |
for _, event := range eventMessages { | |
binPayload, err := proto.Marshal(event.GetPayload()) | |
if err != nil { | |
return apperrors.Wrap(ctx, err) | |
} | |
metadata, err := json.Marshal(event.GetMetadata()) | |
if err != nil { | |
return apperrors.Wrap(ctx, err) | |
} | |
// t, ok := (event.GetPayload()).(WithAccountId) | |
// if ok { | |
// util.DebugJSON(t.GetAccountId()) | |
// } | |
rows = append(rows, []interface{}{ | |
event.GetId(), | |
event.GetStreamId(), | |
event.GetStreamName(), | |
event.GetStreamSeqenceNo(), | |
event.GetEventType(), | |
event.GetPayload(), | |
binPayload, | |
metadata, | |
event.GetEventTime().UTC(), | |
}) | |
} | |
tableIdentifier := pgx.Identifier{ | |
"eventstore", "domain_events", | |
} | |
// copyCount | |
_, err := tx.CopyFrom( | |
ctx, | |
tableIdentifier, | |
[]string{ | |
"event_id", | |
"stream_id", | |
"stream_name", | |
"stream_seq_no", | |
"event_type", | |
"payload", | |
"binary_payload", | |
"metadata", | |
"event_time", | |
}, | |
pgx.CopyFromRows(rows), | |
) | |
return err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment