Skip to content

Instantly share code, notes, and snippets.

@gvergnaud
Last active May 8, 2024 21:25
Show Gist options
  • Save gvergnaud/44603c686a952c9836694d6c92548635 to your computer and use it in GitHub Desktop.
Save gvergnaud/44603c686a952c9836694d6c92548635 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"math/rand"
"time"
)
func interval() chan float64 {
channel := make(chan float64)
go func() {
i := 0.0
for {
time.Sleep(time.Second)
channel <- i
i++
}
}()
return channel
}
func MapChannel[A any, B any](c <-chan A, f func(A) B) <-chan B {
chanB := make(chan B, len(c))
// 👆
// This isn't actually the buffer
// capacity of `c` but it's current
// number of buffered elements,
// so it's kind of wrong.
go func() {
for {
chanB <- f(<-c)
}
}()
return chanB
}
type Msg struct {
msgType string
id float64
content string
}
func (msg Msg) String() string {
return fmt.Sprintf("Msg{%s, %f, %s}", msg.msgType, msg.id, msg.content)
}
func toResponseMsg(i float64) Msg {
randomN := rand.Float64()
if randomN > 0.5 {
return Msg{msgType: "Success", id: i, content: "yes"}
}
return Msg{msgType: "Error", id: i, content: "No"}
}
func Request(msg Msg, requests chan<- Msg, responses <-chan Msg) chan Msg {
requests <- msg
promise := make(chan Msg, 1)
timeout := time.After(10 * time.Second)
go func() {
for {
select {
case <-timeout:
promise <- Msg{msgType: "Error", id: msg.id, content: "Timeout error!"}
return
case res := <-responses:
if res.id == msg.id {
promise <- res
return
}
}
}
}()
return promise
}
type EventEmitter[A any] struct {
subscribers [](chan A)
}
func (eventEmitter *EventEmitter[A]) listen() (<-chan A, func()) {
channel := make(chan A, 10)
eventEmitter.subscribers = append(eventEmitter.subscribers, channel)
unsubscribe := func() {
newSubscribers := [](chan A){}
for _, sub := range eventEmitter.subscribers {
if sub != channel {
newSubscribers = append(newSubscribers, sub)
}
}
eventEmitter.subscribers = newSubscribers
}
return channel, unsubscribe
}
func (eventEmitter *EventEmitter[A]) emit(value A) {
// should we:
// - wrap each <- in a go routine like this:
for _, subscriber := range eventEmitter.subscribers {
go func(sub chan A) {
sub <- value
}(subscriber)
}
// or wrap the whole for loop in a go routine?
}
func Multiplex[A any](input <-chan A) *EventEmitter[A] {
eventEmitter := EventEmitter[A]{}
go func() {
for x := range input {
eventEmitter.emit(x)
}
}()
return &eventEmitter
}
type Pair[A any, B any] struct {
left A
right B
}
func Both[A any, B any](chan1 <-chan A, chan2 <-chan B) chan Pair[A, B] {
all := make(chan Pair[A, B], 1)
go func() {
all <- Pair[A, B]{<-chan1, <-chan2}
}()
return all
}
func main() {
requests := make(chan Msg, 5)
// 👆
// This is important!
// without this, the programs enters a dead lock when
// Request tries to add a Msg to the requests chan because
// it blocks the thread if the channel isn't a buffered channel.
emitter := Multiplex(MapChannel(interval(), toResponseMsg))
responsesForStream, _ := emitter.listen()
responsesForMonitoring, _ := emitter.listen()
promise1 := Request(
Msg{msgType: "Request", content: "hello", id: 4.0},
requests,
responsesForStream)
promise2 := Request(
Msg{msgType: "Request", content: "heyy", id: 7.0},
requests,
responsesForStream)
pairPromise := Both(promise1, promise2)
for {
select {
case x := <-requests:
fmt.Println("new request", x)
case x := <-responsesForMonitoring:
fmt.Println("new response", x)
case pair := <-pairPromise:
fmt.Println("both promises are fulfilled!", pair.left, pair.right)
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment