Skip to content

Instantly share code, notes, and snippets.

@cstockton
Created October 9, 2017 15:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cstockton/9534f55b9872fbb0ac33c39fc1452000 to your computer and use it in GitHub Desktop.
Save cstockton/9534f55b9872fbb0ac33c39fc1452000 to your computer and use it in GitHub Desktop.
type Topic interface {
On(fn Func)
Emit(index int)
Wait()
}
type state struct {
id, idx int
}
func NewCondTopic() *CondTopic {
ct := &CondTopic{
idx: -1,
subs: make(map[*state]struct{}),
}
ct.publisher.L, ct.sub.L = &ct.mu, &ct.mu
return ct
}
type CondTopic struct {
mu sync.Mutex
sync bool
sub, publisher sync.Cond
lis, idx int
subs map[*state]struct{}
}
func (ct *CondTopic) pending() (n int) {
for st := range ct.subs {
n += ct.idx - st.idx
}
return
}
func (ct *CondTopic) wait() {
defer func() {
ct.sync = false
ct.sub.Broadcast()
}()
if ct.sync == true {
ct.synchronize()
}
ct.sync = true
for q := ct.pending(); q > 0; q = ct.pending() {
ct.publisher.Wait()
}
}
func (ct *CondTopic) Wait() {
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.idx == -1 {
return
}
ct.wait()
}
func (ct *CondTopic) Reset() {
ct.mu.Lock()
defer ct.mu.Unlock()
ct.wait()
ct.idx = -1
for st := range ct.subs {
st.idx = -1
}
}
func (ct *CondTopic) Emit() {
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.sync {
ct.synchronize()
}
ct.idx++
ct.sub.Broadcast()
return
}
func (ct *CondTopic) subscribe() *state {
ct.mu.Lock()
st := &state{
id: ct.lis,
idx: ct.idx,
}
ct.subs[st] = struct{}{}
ct.lis++
return st
}
func (ct *CondTopic) unsubscribe(st *state) {
// Panics on fn() would cause our mu.Lock to never be reached, triggering a
// panic in our defer before the panic caused by fn propagated. We recover
// to fix lock state of topic before propagating panic.
if r := recover(); r != nil {
ct.mu.Lock()
panic(r)
}
delete(ct.subs, st)
ct.lis--
ct.mu.Unlock()
ct.publisher.Broadcast()
}
func (ct *CondTopic) synchronize() {
for ct.sync {
ct.sub.Wait()
}
}
// On will block between calls to fn under the given topic. On will stop calling
// fn and return to caller when any of the following states are true:
//
// - Func returns bool false instead of true. (local ok == false)
// - The global context is done: (emitter ok == false)
// - The state is closed and we have reached the final index:
// (topic state closed && topic state index >= local index)
//
func (ct *CondTopic) On(fn Func) {
st := ct.subscribe()
if ct.sync {
ct.synchronize()
}
go func() {
defer ct.unsubscribe(st)
for ok := true; ok; {
if st.idx == ct.idx {
ct.publisher.Broadcast()
ct.sub.Wait()
continue
}
idx := st.idx
ct.mu.Unlock()
ok = fn(idx + 1)
ct.mu.Lock()
st.idx++
}
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment