Created
April 26, 2024 22:45
-
-
Save maxsei/f761f038c60fef5366be969d3c4f4ff0 to your computer and use it in GitHub Desktop.
crappy generic event sourcing with materialized views in go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package matview | |
// I have a go data structure that stores a buffer of events. The events are | |
// tuples of (K, V, OP) where K is a unique identifier for the event, V is other | |
// data associated with the event, and OP is an operation that is either PUT or | |
// DEL. For an event with a "PUT" OP, the event will be appended to the buffer | |
// of events, if the event's K exists then the value will be overwritten. For an | |
// event with a "DEL" OP, the event found and removed from the buffer. Consumers | |
// of this data structure subscribe and get the full state of the events stored | |
// and all subsequent events. | |
// Really what this is replicating state then updating each replica based on the | |
// event's received with the same logic. (code + data) needs to be consistent | |
// accross concurrent processes. | |
import "sync" | |
type Event[T any] interface { | |
ID() [16]byte | |
Value() T | |
Redacted() bool // False means added | |
} | |
func NewMaterializedView[T any]() *MaterializedView[T] { | |
return &MaterializedView[T]{} | |
} | |
type MaterializedView[T any] struct { | |
events []Event[T] | |
subs []chan Event[T] | |
sync.Mutex | |
} | |
func (mv *MaterializedView[T]) Update(e Event[T]) { | |
mv.Lock() | |
defer mv.Unlock() | |
// Update. | |
i := -1 | |
for j := range mv.events { | |
if mv.events[j].ID() == e.ID() { | |
i = j | |
break | |
} | |
} | |
if e.Redacted() { | |
if i == -1 { | |
panic("redacted event that doesn't exist") | |
} else { | |
mv.events = append(mv.events[i+1:], mv.events[:i]...) | |
} | |
} else { | |
if i == -1 { | |
mv.events = append(mv.events, e) | |
} else { | |
mv.events[i] = e | |
} | |
} | |
// Notify. | |
for _, sub := range mv.subs { | |
// TODO: Need to find a way make this not block as much as possible... <26-04-24, Max Schulte> // | |
sub <- e | |
} | |
} | |
func (mv *MaterializedView[T]) Replicate() ([]Event[T], chan Event[T]) { | |
sub := make(chan Event[T]) | |
mv.Lock() | |
defer mv.Unlock() | |
mv.subs = append(mv.subs, sub) | |
// TODO: this clone needs to guaranteed to be deep enough <26-04-24, Max Schulte> // | |
replica := make([]Event[T], len(mv.events)) | |
copy(replica, mv.events) | |
return replica, sub | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment