Skip to content

Instantly share code, notes, and snippets.

@davidbalbert
Last active April 22, 2023 15:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidbalbert/a58261c3851d3ca383334b776c2c97d2 to your computer and use it in GitHub Desktop.
Save davidbalbert/a58261c3851d3ca383334b776c2c97d2 to your computer and use it in GitHub Desktop.
Broadcast notifier sketches
// Lightly modified from the slides for "Rethinking Classical Concurrency Patterns"
package sync
import "context"
type state struct {
seq int64
changed chan struct{} // closed upon notify
}
type Notifier struct {
st chan state
}
func NewNotifier() *Notifier {
st := make(chan state, 1)
st <- state{
seq: 0,
changed: make(chan struct{}),
}
return &Notifier{st: st}
}
func (n *Notifier) NotifyChange() {
st := <-n.st
close(st.changed)
n.st <- state{
seq: st.seq + 1,
changed: make(chan struct{}),
}
}
// If you call AwaitChange() with a wrong seq, it'll immediately notify you
// with the current one.
func (n *Notifier) AwaitChange(ctx context.Context, seq int64) (newSeq int64) {
st := <-n.st
n.st <- st
if st.seq != seq {
return st.seq
}
select {
case <-ctx.Done():
return seq
case <-st.changed:
return seq + 1
}
}
// Calling Seq is usually unnecessary. You can just start with 0, and AwaitChange
// will give you the correct seq. But if getting signaled twice is expensive, you
// can limit the likelyhood of getting signaled twice by calling Seq() first.
func (n *Notifier) Seq() int64 {
st := <-n.st
n.st <- st
return st.seq
}
// Same guarantees as ReliableNotifier (I think, none of this is tested), but
// with a similar interface to Notifier. Queue is copied verbatim from "Rethinking
// Classical Concurrency Primatives"
package sync
import (
"context"
)
type Queue[T any] struct {
items chan []T // contains 0 or 1 non-empty slices
empty chan bool // contains true if items is empty
}
func NewQueue[T any]() *Queue[T] {
items := make(chan []T, 1)
empty := make(chan bool, 1)
empty <- true
return &Queue[T]{items, empty}
}
func (q *Queue[T]) Put(item T) {
var items []T
select {
case items = <-q.items:
case <-q.empty:
}
items = append(items, item)
q.items <- items
}
func (q *Queue[T]) Get(ctx context.Context) T {
var items []T
select {
case <-ctx.Done():
var zero T
return zero
case items = <-q.items:
}
item := items[0]
items = items[1:]
if len(items) == 0 {
q.empty <- true
} else {
q.items <- items
}
return item
}
type Token struct {
t chan struct{}
}
// A struct that facilitates one-to-many broadcast notifications. All listeners are guaranteed
// to be notified of every change, and once you've registered, you won't miss any changes.
//
// St is a buffered channel that acts as a mutex for the notifier's state. The state is a map
// from arbitrary unique values (in this case a channelvalue ) to queues. The queues function
// similarly to a buffered channel, except they can grow infinitely, which means a slow listener
// will never block the notifier. Essentially, you're trading potentially unbounded memory growth
// for the guarantee that you won't miss any messages and no goroutine can slow another down.
//
// The channel is wrapped in a Token struct to hide its implementation details. No values are
// ever sent on the channel, it's just used a unique value.
//
// One thing this is missing right now: when you register a listener, it's sometimes useful to
// immediately receive the most recent value. Right now, you can't do that, but it would be
// pretty easy to add "lastValue" to the state.
type QueuedNotifier[T any] struct {
st chan map[chan struct{}]*Queue[T]
}
func NewQueuedNotifier[T any]() *QueuedNotifier[T] {
state := make(chan map[chan struct{}]*Queue[T], 1)
state <- make(map[chan struct{}]*Queue[T])
return &QueuedNotifier[T]{
st: state,
}
}
func (n *QueuedNotifier[T]) Register() Token {
q := NewQueue[T]()
t := make(chan struct{})
st := <-n.st
st[t] = q
n.st <- st
return Token{t}
}
func (n *QueuedNotifier[T]) Unregister(t Token) {
st := <-n.st
delete(st, t.t)
n.st <- st
}
func (n *QueuedNotifier[T]) NotifyChange(v T) {
st := <-n.st
for _, q := range st {
q.Put(v)
}
n.st <- st
}
func (n *QueuedNotifier[T]) AwaitChange(ctx context.Context, t Token) (T, bool) {
st := <-n.st
q := st[t.t]
n.st <- st
if q == nil {
var zero T
return zero, false
}
return q.Get(ctx), true
}
// Ensures you don't miss a message. Interface based on
// signal.Notify(). Overly complicated for my tastes.
package sync
import "context"
type waiter[T any] struct {
q *Queue[T]
cancel context.CancelFunc
}
type ReliableNotifier[T any] struct {
st chan map[chan<- T]waiter[T]
}
func NewReliableNotifier[T any]() *ReliableNotifier[T] {
state := make(chan map[chan<- T]waiter[T], 1)
state <- make(map[chan<- T]waiter[T])
return &ReliableNotifier[T]{
st: state,
}
}
func (n *ReliableNotifier[T]) Broadcast(v T) {
st := <-n.st
for _, w := range st {
w.q.Put(v)
}
n.st <- st
}
func (n *ReliableNotifier[T]) Notify(c chan<- T) {
ctx, cancel := context.WithCancel(context.Background())
w := waiter[T]{
q: NewQueue[T](),
cancel: cancel,
}
st := <-n.st
st[c] = w
n.st <- st
go func() {
for {
v := w.q.Get(ctx)
select {
case <-ctx.Done():
return
default:
}
c <- v
}
}()
}
func (n *ReliableNotifier[T]) Stop(c chan<- T) {
st := <-n.st
w, ok := st[c]
if ok {
w.cancel()
delete(st, c)
}
n.st <- st
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment