Skip to content

Instantly share code, notes, and snippets.

@threeaccents
Last active January 9, 2018 01:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save threeaccents/b6d6f77deb491d0179ed0f7b42893ce8 to your computer and use it in GitHub Desktop.
Save threeaccents/b6d6f77deb491d0179ed0f7b42893ce8 to your computer and use it in GitHub Desktop.
Simple Message Queue
package smq
//SMQ is the main controller for the queue.
type SMQ struct {
jobQueue chan []byte
consumer chan Message
worker chan int
}
//New takes in the max queue size and workers.
func New(maxQueue, maxWorkers int) *SMQ {
q := &SMQ{
jobQueue: make(chan []byte, maxQueue),
consumer: make(chan Message),
worker: make(chan int, maxWorkers),
}
go q.listen()
return q
}
//Push adds a payload to the queue
func (q *SMQ) Push(payload []byte) {
q.jobQueue <- payload
}
//Consume will return a channel that new payloads from the queue will be sent to.
func (q *SMQ) Consume() <-chan Message {
return q.consumer
}
func (q *SMQ) listen() {
for payload := range q.jobQueue {
q.worker <- 1
q.consumer <- Message{
done: q.worker,
Payload: payload,
}
}
}
//Message contains the payload and also has a channel to inform the queue the message is finished.
type Message struct {
Payload []byte
done chan int
}
//Finish will let the queue know it are ready to take on another worker.
func (m Message) Finish() {
<-m.done
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment