Skip to content

Instantly share code, notes, and snippets.

@speps
Created September 18, 2016 09:06
Show Gist options
  • Save speps/ce645a5ca2d2cb9a81e52c7311f38677 to your computer and use it in GitHub Desktop.
Save speps/ce645a5ca2d2cb9a81e52c7311f38677 to your computer and use it in GitHub Desktop.
Golang: server streaming data using channels broadcasting (one source, multiple destination)
package main
import (
"fmt"
"log"
"net/http"
"sync"
)
type worker struct {
source chan int
}
type threadSafeSlice struct {
sync.Mutex
workers []*worker
}
func (s *threadSafeSlice) Push(w *worker) {
s.Lock()
defer s.Unlock()
s.workers = append(s.workers, w)
}
func (s *threadSafeSlice) Iter(routine func(*worker)) {
s.Lock()
defer s.Unlock()
for _, worker := range s.workers {
routine(worker)
}
}
func broadcaster(bcast *threadSafeSlice, ch chan int) {
for {
msg := <-ch
bcast.Iter(func(w *worker) { w.source <- msg })
}
}
func generator(ch chan int) {
v := 0
for {
ch <- v
v++
}
}
func main() {
bcast := &threadSafeSlice{
workers: make([]*worker, 0, 1),
}
quit := make(chan bool)
http.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) {
wk := &worker{
source: make(chan int),
}
bcast.Push(wk)
q := false
for !q {
select {
case v := <-wk.source:
fmt.Fprintf(w, "%v\n", v)
case q = <-quit:
break
}
}
})
values := make(chan int)
go broadcaster(bcast, values)
go generator(values)
log.Fatal(http.ListenAndServe(":8080", nil))
}
@jagandecapri
Copy link

@speps Where does the quit channel is sent a value in order for the case statement case q = <-quit: to be executed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment