Skip to content

Instantly share code, notes, and snippets.

@vitan
Last active January 23, 2022 08:51
Show Gist options
  • Save vitan/aedb628a40478cf8b6a33dc87a5ff52f to your computer and use it in GitHub Desktop.
Save vitan/aedb628a40478cf8b6a33dc87a5ff52f to your computer and use it in GitHub Desktop.
PriorityQueue in golang, to support multiple producers and multiple consumers. with example how to use it
package main
import (
"errors"
"fmt"
"math"
"reflect"
"sync"
)
const (
ITEM_COUNT = 5
EMPTY_VAL = math.MaxInt64
ERROR_QUEUE_CLOSED = "error-closed-queue"
)
// Priority Queue Implement
type PriorityQueue struct {
queues []chan int
capacity int
opening_q_counts int
mutex *sync.Mutex
}
func (pQ *PriorityQueue) NewPriorityQueue(prioritys int, capacity int) *PriorityQueue {
pQ.queues = []chan int{}
pQ.capacity = capacity
pQ.opening_q_counts = prioritys
pQ.mutex = &sync.Mutex{}
for i := 0; i < prioritys; i++ {
pQ.queues = append(pQ.queues, make(chan int, capacity))
}
return pQ
}
func (pQ *PriorityQueue) Enqueue(priority int, val int) error {
if priority >= len(pQ.queues) || priority < 0 {
return errors.New("out of index")
}
idx := len(pQ.queues) - priority - 1
pQ.queues[idx] <- val
return nil
}
func (pQ *PriorityQueue) Dequeue() (int, error) {
cases := make([]reflect.SelectCase, len(pQ.queues))
for i, q := range pQ.queues {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(q)}
}
for pQ.opening_q_counts > 0 {
chosen, value, ok := reflect.Select(cases)
if !ok {
cases[chosen].Chan = reflect.ValueOf(nil)
pQ.mutex.Lock()
pQ.opening_q_counts -= 1
pQ.mutex.Unlock()
} else {
return int(value.Int()), nil
}
}
return EMPTY_VAL, errors.New(ERROR_QUEUE_CLOSED)
}
// Producer&Consumer avatar
func producer(wg *sync.WaitGroup, priority int, pQ *PriorityQueue) {
defer wg.Done()
for i := 0; i < ITEM_COUNT; i++ {
//(wtzhou) Q: why priority*10+i?
// A: make consumer output readable. change me if needed
value := priority*10 + i
if err := pQ.Enqueue(priority, value); err != nil {
fmt.Printf("ERROR: %s\n", err.Error())
}
fmt.Printf("Produced item: %d on priority %d\n", i, priority)
}
}
func consumer(wg *sync.WaitGroup, pQ *PriorityQueue) {
defer wg.Done()
for {
val, err := pQ.Dequeue()
if err != nil {
if err.Error() == ERROR_QUEUE_CLOSED {
break
}
} else {
fmt.Printf("Dequeue value: %d\n", val)
}
}
}
// Sample: produce some value to different priority
func SpawnProducer(wg *sync.WaitGroup, pQ *PriorityQueue) {
for i := 0; i < 8; i++ {
wg.Add(1)
go producer(wg, i, pQ)
}
}
// Sample: consume some value
func SpawnConsumer(wg *sync.WaitGroup, pQ *PriorityQueue) {
wg.Add(1)
go consumer(wg, pQ)
wg.Add(1)
go consumer(wg, pQ)
}
func main() {
fmt.Println("Starting Producer, Consumer")
pQ := &PriorityQueue{}
pQ = pQ.NewPriorityQueue(10, 10)
producer_wg := &sync.WaitGroup{}
SpawnProducer(producer_wg, pQ)
producer_wg.Wait()
// close all the queue
for _, q := range pQ.queues {
close(q)
}
consumer_wg := &sync.WaitGroup{}
SpawnConsumer(consumer_wg, pQ)
consumer_wg.Wait()
fmt.Println("Exited successfully")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment