Instantly share code, notes, and snippets.

@shaunlee /queue.go
Last active Mar 2, 2016

Embed
What would you like to do?
Recycle Queue
package main
import (
"fmt"
"time"
"sync/atomic"
)
const (
ITEM_DATA_SIZE = 4096
)
const (
ITEM_STATUS_WRITING = 1 << iota
ITEM_STATUS_READING
)
type Item struct {
index uint64
status uint8
data []byte
length int
queue *RecyleQueue
}
func NewItem(p *RecyleQueue) *Item {
return &Item{
index: 0,
status: ITEM_STATUS_WRITING,
data: make([]byte, ITEM_DATA_SIZE),
length: 0,
queue: p,
}
}
func (p *Item) Data() []byte {
return p.data[:p.length]
}
func (p *Item) SetData(data []byte) {
p.length = len(data)
copy(p.data, data)
}
func (p *Item) Commit() {
switch p.status {
case ITEM_STATUS_WRITING:
// persist to file
p.queue.ready(p)
case ITEM_STATUS_READING:
// delete from file
p.queue.reuse(p)
}
}
func (p *Item) setWriting() {
p.status = ITEM_STATUS_WRITING
}
func (p *Item) setReading() {
p.status = ITEM_STATUS_READING
}
type RecyleQueue struct {
capacity, index uint64
recyle, queue chan *Item
}
func NewRecyleQueue(capacity uint64) *RecyleQueue {
p := &RecyleQueue{
capacity: capacity,
recyle: make(chan *Item, capacity),
queue: make(chan *Item, capacity),
}
// restore queue from file
return p
}
func (p *RecyleQueue) ready(item *Item) { p.queue <- item }
func (p *RecyleQueue) reuse(item *Item) { p.recyle <- item }
func (p *RecyleQueue) Produce() (*Item, error) {
if uint64(len(p.queue)) == p.capacity {
return nil, fmt.Errorf("There are no writable item yet")
}
var item *Item
select {
case item = <-p.recyle:
item.setWriting()
default:
item = NewItem(p)
}
item.index = atomic.AddUint64(&p.index, 1) - 1
return item, nil
}
func (p *RecyleQueue) Consume() (*Item, error) {
select {
case item := <-p.queue:
item.setReading()
return item, nil
default:
return nil, fmt.Errorf("There are no readable entry yet")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment