Skip to content

Instantly share code, notes, and snippets.

@maxsei
Created April 26, 2024 22:45
Show Gist options
  • Save maxsei/f761f038c60fef5366be969d3c4f4ff0 to your computer and use it in GitHub Desktop.
Save maxsei/f761f038c60fef5366be969d3c4f4ff0 to your computer and use it in GitHub Desktop.
crappy generic event sourcing with materialized views in go
package matview
// I have a go data structure that stores a buffer of events. The events are
// tuples of (K, V, OP) where K is a unique identifier for the event, V is other
// data associated with the event, and OP is an operation that is either PUT or
// DEL. For an event with a "PUT" OP, the event will be appended to the buffer
// of events, if the event's K exists then the value will be overwritten. For an
// event with a "DEL" OP, the event found and removed from the buffer. Consumers
// of this data structure subscribe and get the full state of the events stored
// and all subsequent events.
// Really what this is replicating state then updating each replica based on the
// event's received with the same logic. (code + data) needs to be consistent
// accross concurrent processes.
import "sync"
type Event[T any] interface {
ID() [16]byte
Value() T
Redacted() bool // False means added
}
func NewMaterializedView[T any]() *MaterializedView[T] {
return &MaterializedView[T]{}
}
type MaterializedView[T any] struct {
events []Event[T]
subs []chan Event[T]
sync.Mutex
}
func (mv *MaterializedView[T]) Update(e Event[T]) {
mv.Lock()
defer mv.Unlock()
// Update.
i := -1
for j := range mv.events {
if mv.events[j].ID() == e.ID() {
i = j
break
}
}
if e.Redacted() {
if i == -1 {
panic("redacted event that doesn't exist")
} else {
mv.events = append(mv.events[i+1:], mv.events[:i]...)
}
} else {
if i == -1 {
mv.events = append(mv.events, e)
} else {
mv.events[i] = e
}
}
// Notify.
for _, sub := range mv.subs {
// TODO: Need to find a way make this not block as much as possible... <26-04-24, Max Schulte> //
sub <- e
}
}
func (mv *MaterializedView[T]) Replicate() ([]Event[T], chan Event[T]) {
sub := make(chan Event[T])
mv.Lock()
defer mv.Unlock()
mv.subs = append(mv.subs, sub)
// TODO: this clone needs to guaranteed to be deep enough <26-04-24, Max Schulte> //
replica := make([]Event[T], len(mv.events))
copy(replica, mv.events)
return replica, sub
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment