Skip to content

Instantly share code, notes, and snippets.

@juanpabloaj
Created July 2, 2023 22:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save juanpabloaj/8855566497175255a1fc6f4b6febed74 to your computer and use it in GitHub Desktop.
Save juanpabloaj/8855566497175255a1fc6f4b6febed74 to your computer and use it in GitHub Desktop.
heartbeats example
// package main example of the book Concurrency in Go by Katherine Cox-Buday
// it includes the patterns: orChannel, orDone, take, and bridge.
// newSteward monitors a goroutine and restarts it if it stops sending heartbeats
package main
import (
"log"
"time"
)
func or(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
func orDone(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
func bridge(
done <-chan interface{},
chanStream <-chan <-chan interface{},
) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
case <-done:
return
}
for val := range orDone(done, stream) {
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}
type startGoroutineFn func(
done <-chan interface{},
pulseInterval time.Duration,
) (heartbeat <-chan interface{})
func newSteward(
timeout time.Duration,
startGoroutine startGoroutineFn,
) startGoroutineFn {
return func(
done <-chan interface{},
pulseInterval time.Duration,
) <-chan interface{} {
heartbeat := make(chan interface{})
go func() {
defer close(heartbeat)
var wardDone chan interface{}
var wardHeartbeat <-chan interface{}
startWard := func() {
wardDone = make(chan interface{})
wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2)
}
startWard()
pulse := time.Tick(pulseInterval)
monitorLoop:
for {
timeoutSignal := time.After(timeout)
for {
select {
case <-pulse:
select {
case heartbeat <- struct{}{}:
default:
}
case <-wardHeartbeat:
continue monitorLoop
case <-timeoutSignal:
log.Println("steward: ward unhealthy, restarting")
close(wardDone)
startWard()
continue monitorLoop
case <-done:
return
}
}
}
}()
return heartbeat
}
}
func take(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
func main() {
log.SetFlags(log.Ltime | log.Lmicroseconds)
doWorkFn := func(
done <-chan interface{},
intList ...int,
) (startGoroutineFn, <-chan interface{}) {
intChanStream := make(chan (<-chan interface{}))
intStreamOut := bridge(done, intChanStream)
doWork := func(
done <-chan interface{},
pulseInterval time.Duration,
) <-chan interface{} {
intStream := make(chan interface{})
heartbeat := make(chan interface{})
go func() {
defer close(intStream)
select {
case intChanStream <- intStream:
case <-done:
return
}
pulse := time.Tick(pulseInterval)
for {
valueLoop:
for _, intVal := range intList {
if intVal < 0 {
log.Printf("negative value: %v\n", intVal)
return
}
for {
select {
case <-pulse:
select {
case heartbeat <- struct{}{}:
default:
}
case intStream <- intVal:
continue valueLoop
case <-done:
return
}
}
}
}
}()
return heartbeat
}
return doWork, intStreamOut
}
done := make(chan interface{})
defer close(done)
doWork, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5)
doWorkWithSteward := newSteward(1*time.Millisecond, doWork)
doWorkWithSteward(done, 1*time.Hour)
for intVal := range take(done, intStream, 6) {
log.Printf("received %v\n", intVal)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment