Skip to content

Instantly share code, notes, and snippets.

@stanroze
Created May 17, 2016 21:39
Show Gist options
  • Save stanroze/8815ba7d8a5010b457adb12961e5f2cc to your computer and use it in GitHub Desktop.
Save stanroze/8815ba7d8a5010b457adb12961e5f2cc to your computer and use it in GitHub Desktop.
Blocking Queue
package queue
type Element struct {
value interface{}
next chan *Element
}
type Queue struct{
elements <-chan *Element
appender chan<- interface{}
shutdown chan struct{}
}
//New creates a new blocking queue
func New() *Queue{
shutdown := make(chan struct{})
appender := make(chan interface{}, 1)
inital := make(chan *Element, 1)
go func(){
last := &Element{nil, inital}
for{
select{
case item := <- appender:
e:= &Element{item, make(chan *Element, 1)}
last.next <- e
last = e
case <- shutdown:
return
}
}
}()
return &Queue{inital, appender, shutdown}
}
//Append adds an elemnt on the queue asynchronously
func (q *Queue) Append(v interface{}){
select{
case q.appender <- v:
case <- q.shutdown:
return
}
}
//Next blocks until there is an item on the queue and then returns it to the consumer
func (q *Queue) Next() interface{} {
//block until there is an element
e := <- q.elements
//swap the channel to a new channel of 1
q.elements = e.next
return e.value
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment