Skip to content

Instantly share code, notes, and snippets.

@sharpevo
Created February 26, 2019 03:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sharpevo/3a5af0ef527040e43c9914ce2a9f0278 to your computer and use it in GitHub Desktop.
Save sharpevo/3a5af0ef527040e43c9914ce2a9f0278 to your computer and use it in GitHub Desktop.
A thread-safe blocking queue, stolen from https://github.com/dnaeon
package blockingqueue
import (
"conditionvariable"
"fmt"
"runtime"
"sync"
)
type BlockingQueue struct {
lock sync.Mutex
itemList []interface{}
pushCondition conditionvariable.ChannelCondition
termCondition conditionvariable.ChannelCondition
}
func NewBlockingQueue() *BlockingQueue {
b := &BlockingQueue{}
b.itemList = []interface{}{}
b.Cursor = make(chan int)
b.pushCondition = conditionvariable.NewChannelCondition()
b.termCondition = conditionvariable.NewChannelCondition()
return b
}
func (b *BlockingQueue) Reset() {
b.Lock()
defer b.Unlock()
b.itemList = []interface{}{}
b.termCondition.Broadcast()
}
func (b *BlockingQueue) Push(item interface{}) {
b.Lock()
defer b.Unlock()
b.itemList = append(b.itemList, item)
b.pushCondition.Broadcast()
}
func (b *BlockingQueue) Pop() (interface{}, error) {
b.Lock()
defer b.Unlock()
for len(b.itemList) == 0 {
b.Unlock()
select {
case <-b.pushCondition.Wait():
b.Lock()
continue
case <-b.termCondition.Wait():
b.Lock()
return nil, fmt.Errorf("queue terminated")
}
}
item := b.itemList[0]
b.itemList = b.itemList[1:]
return item, nil
}
func (b *BlockingQueue) Length() int {
b.Lock()
defer b.Unlock()
return len(b.itemList)
}
func (b *BlockingQueue) Get(index int) (interface{}, error) {
b.Lock()
defer b.Unlock()
if index < 0 || index >= len(b.itemList) {
return nil, fmt.Errorf("invalid index")
}
return b.itemList[index], nil
}
func (b *BlockingQueue) Append(item interface{}) {
b.Lock()
defer b.Unlock()
b.itemList = append(b.itemList, item)
}
type Item struct {
Index int
Value interface{}
}
func (b *BlockingQueue) Iter() <-chan Item {
itemc := make(chan Item)
go func() {
defer close(itemc)
b.Lock()
defer b.Unlock()
for k, v := range b.itemList {
itemc <- Item{k, v}
}
}()
return itemc
}
func (b *BlockingQueue) Lock() {
b.lock.Lock()
}
func (b *BlockingQueue) Unlock() {
b.lock.Unlock()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment