Created
October 9, 2017 15:20
-
-
Save cstockton/9534f55b9872fbb0ac33c39fc1452000 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type Topic interface { | |
On(fn Func) | |
Emit(index int) | |
Wait() | |
} | |
type state struct { | |
id, idx int | |
} | |
func NewCondTopic() *CondTopic { | |
ct := &CondTopic{ | |
idx: -1, | |
subs: make(map[*state]struct{}), | |
} | |
ct.publisher.L, ct.sub.L = &ct.mu, &ct.mu | |
return ct | |
} | |
type CondTopic struct { | |
mu sync.Mutex | |
sync bool | |
sub, publisher sync.Cond | |
lis, idx int | |
subs map[*state]struct{} | |
} | |
func (ct *CondTopic) pending() (n int) { | |
for st := range ct.subs { | |
n += ct.idx - st.idx | |
} | |
return | |
} | |
func (ct *CondTopic) wait() { | |
defer func() { | |
ct.sync = false | |
ct.sub.Broadcast() | |
}() | |
if ct.sync == true { | |
ct.synchronize() | |
} | |
ct.sync = true | |
for q := ct.pending(); q > 0; q = ct.pending() { | |
ct.publisher.Wait() | |
} | |
} | |
func (ct *CondTopic) Wait() { | |
ct.mu.Lock() | |
defer ct.mu.Unlock() | |
if ct.idx == -1 { | |
return | |
} | |
ct.wait() | |
} | |
func (ct *CondTopic) Reset() { | |
ct.mu.Lock() | |
defer ct.mu.Unlock() | |
ct.wait() | |
ct.idx = -1 | |
for st := range ct.subs { | |
st.idx = -1 | |
} | |
} | |
func (ct *CondTopic) Emit() { | |
ct.mu.Lock() | |
defer ct.mu.Unlock() | |
if ct.sync { | |
ct.synchronize() | |
} | |
ct.idx++ | |
ct.sub.Broadcast() | |
return | |
} | |
func (ct *CondTopic) subscribe() *state { | |
ct.mu.Lock() | |
st := &state{ | |
id: ct.lis, | |
idx: ct.idx, | |
} | |
ct.subs[st] = struct{}{} | |
ct.lis++ | |
return st | |
} | |
func (ct *CondTopic) unsubscribe(st *state) { | |
// Panics on fn() would cause our mu.Lock to never be reached, triggering a | |
// panic in our defer before the panic caused by fn propagated. We recover | |
// to fix lock state of topic before propagating panic. | |
if r := recover(); r != nil { | |
ct.mu.Lock() | |
panic(r) | |
} | |
delete(ct.subs, st) | |
ct.lis-- | |
ct.mu.Unlock() | |
ct.publisher.Broadcast() | |
} | |
func (ct *CondTopic) synchronize() { | |
for ct.sync { | |
ct.sub.Wait() | |
} | |
} | |
// On will block between calls to fn under the given topic. On will stop calling | |
// fn and return to caller when any of the following states are true: | |
// | |
// - Func returns bool false instead of true. (local ok == false) | |
// - The global context is done: (emitter ok == false) | |
// - The state is closed and we have reached the final index: | |
// (topic state closed && topic state index >= local index) | |
// | |
func (ct *CondTopic) On(fn Func) { | |
st := ct.subscribe() | |
if ct.sync { | |
ct.synchronize() | |
} | |
go func() { | |
defer ct.unsubscribe(st) | |
for ok := true; ok; { | |
if st.idx == ct.idx { | |
ct.publisher.Broadcast() | |
ct.sub.Wait() | |
continue | |
} | |
idx := st.idx | |
ct.mu.Unlock() | |
ok = fn(idx + 1) | |
ct.mu.Lock() | |
st.idx++ | |
} | |
}() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment