Skip to content

Instantly share code, notes, and snippets.

@powerman
Created June 20, 2021 16:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save powerman/39e802755a78812a560516de6cd02ca8 to your computer and use it in GitHub Desktop.
Save powerman/39e802755a78812a560516de6cd02ca8 to your computer and use it in GitHub Desktop.
Helper for buffering data from non-blocking channel for sending into blocking channel
// Usage example:
func process(in <-chan Msg, outBlocking chan<- Msg) {
out := newQueueMsg(QueueSize, outBlocking)
for {
select {
case msg := <-in:
out.append(msg)
case out.C <- out.Elem:
out.del()
}
}
}
// Helper:
type queueMsg struct {
C chan<- Msg
Elem Msg
Queue []Msg
c chan<- Msg
}
func newQueueMsg(size int, out chan<- Msg) *queueMsg {
return &queueMsg{
c: out,
Queue: make([]Msg, 0, size),
}
}
func (q *queueMsg) add(elem Msg) bool {
if len(q.Queue) == cap(q.Queue) {
return false
}
q.append(elem)
return true
}
func (q *queueMsg) append(elem Msg) {
if len(q.Queue) == 0 {
q.C = q.c
q.Elem = elem
}
q.Queue = append(q.Queue, elem)
}
func (q *queueMsg) del() {
copy(q.Queue, q.Queue[1:])
q.Queue = q.Queue[:len(q.Queue)-1]
if len(q.Queue) == 0 {
q.C = nil
} else {
q.Elem = q.Queue[0]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment