Skip to content

Instantly share code, notes, and snippets.

@shaunlee shaunlee/ringbuffer.go
Last active Mar 2, 2016

Embed
What would you like to do?
RingBuffer
package main
import (
"fmt"
"log"
"os"
"io"
"bufio"
"runtime"
"path/filepath"
"encoding/binary"
"code.google.com/p/snappy-go/snappy"
)
const (
MAX_LINES uint64 = 10000
MAX_UINT64 uint64 = 0xffffffffffffffff
)
const (
CAPACITY_1M uint64 = 1 << (20 + iota)
CAPACITY_2M
CAPACITY_4M
CAPACITY_8M
CAPACITY_16M
)
const (
ENTRY_FLAG_COMPRESSED uint8 = 1 << iota
ENTRY_FLAG_DELETED
)
type Entry interface {
Data() []byte
SetData(data []byte)
Commit()
}
type rbentry struct {
index uint64
data []byte
rb *RingBuffer
}
func (p *rbentry) Data() []byte {
return p.data
}
func (p *rbentry) SetData(data []byte) {
p.data = data
}
func (p *rbentry) Commit() {
go p.rb.dealt(p)
}
/**
* 文件格式:
* CURRENT: 记录队列头(读的位置)和尾(写的位置), 16bits(uint64 * 2)
* db00000000.dat:
* Package[index:uint64, flag(ENTRY_FLAG_COMPRESSED):uint8, data length:uint32, string]
* 已处理完成(删除)的数据: Package[index:uint64, flag(ENTRY_FLAG_DELETED):uint8]
*/
type RingBuffer struct {
dbname string
head, tail, capacity, mod uint64
buffer []Entry
gcounter uint64
}
/**
* capacity 必须是 2 的 N 次方, 用位运算代替 mod 计算
*
* 如: index = tail & (capacity - 1) 代替 index = tail % capacity
*/
func NewRingBuffer(dbname string, capacity uint64) (*RingBuffer, error) {
if capacity == 0 || capacity & (capacity - 1) != 0 {
return nil, fmt.Errorf("Capacity must be a power of 2")
}
if _, err := os.Stat(dbname); err != nil {
if !os.IsNotExist(err) { return nil, err } else {
if err = os.MkdirAll(dbname, 0755); err != nil { return nil, err }
}
}
p := &RingBuffer{}
p.dbname = dbname
p.capacity = capacity
p.mod = capacity - 1
p.buffer = make([]Entry, p.capacity)
log.Println("RingBuffer: restoring the buffer ...")
p.head = MAX_UINT64
p.restore()
if p.head == MAX_UINT64 { p.head = 0 }
if p.tail - p.head > capacity { return nil, fmt.Errorf("Capacity is not enough to recover the buffer") }
p.skip()
log.Println("RingBuffer: now i'm ready")
go p.gc()
return p, nil
}
func (p *RingBuffer) restore() {
var isEmpty = true
if files, err := filepath.Glob(fmt.Sprintf("%s/*.dat", p.dbname)); err == nil {
for _, filename := range files {
if fp, err := os.Open(filename); err == nil {
reader := bufio.NewReader(fp)
for {
var (
length uint32
flag uint8
compressed, deleted bool
nread int = 0
entry = &rbentry{rb: p}
)
if err := binary.Read(reader, binary.LittleEndian, &entry.index); err == io.EOF {
break
}
if err := binary.Read(reader, binary.LittleEndian, &flag); err == io.EOF {
break
} else {
compressed = (flag & ENTRY_FLAG_COMPRESSED) == ENTRY_FLAG_COMPRESSED
deleted = (flag & ENTRY_FLAG_DELETED) == ENTRY_FLAG_DELETED
}
if deleted {
p.buffer[entry.index & p.mod] = nil
} else {
isEmpty = false
if err := binary.Read(reader, binary.LittleEndian, &length); err == io.EOF {
break
} else {
data := make([]byte, length)
for nread < int(length) {
if n, err := reader.Read(data[nread:]); err != nil { break } else { nread += n }
}
entry.SetData(data)
if compressed {
if buf, err := snappy.Decode(nil, data); err == nil {
entry.SetData(buf)
}
}
}
p.buffer[entry.index & p.mod] = entry
if entry.index < p.head {
p.head = entry.index
}
if entry.index > p.tail {
p.tail = entry.index
}
}
}
fp.Close()
}
}
}
if !isEmpty { p.tail++ }
}
func (p *RingBuffer) skip() {
if p.tail > p.head {
var capables = make([]uint64, 0)
for index := p.tail - 1; index >= p.head; index-- {
entry := p.buffer[index & p.mod]
if entry == nil {
capables = append(capables, index)
} else if len(capables) > 0 {
//[0, 1, 2, 3, 4]
// delete 1, 3
//[0, -, 2, -, 4]
//[0, -, -, 2, 4]
//[-, -, 0, 2, 4]
entry.(*rbentry).index = capables[0]
p.buffer[capables[0] & p.mod] = entry
capables = append(capables[1:], index)
}
if index == 0 { break }
}
if n := uint64(len(capables)); n > 0 {
p.head = p.head + n
}
}
}
func (p *RingBuffer) gc() {
index := p.head
if index - index % MAX_LINES > 0 {
index = index / MAX_LINES - 1
for index >= 0 {
filename := fmt.Sprintf("%s/db%.8x.dat", p.dbname, index)
if _, err := os.Stat(filename); err != nil {
break
} else {
os.Remove(filename)
index--
}
}
}
}
func (p *RingBuffer) Publish(data []byte) error {
if p.tail - p.head >= p.capacity {
return fmt.Errorf("There are no writable entry yet")
}
entry := &rbentry{index: p.tail, rb: p}
entry.SetData(data)
go p.persist(entry)
p.buffer[p.tail & p.mod] = entry
p.tail++
return nil
}
func (p *RingBuffer) Consume() (Entry, error) {
if p.head - p.tail < 1 {
return nil, fmt.Errorf("There are no readable entry yet")
}
var entry Entry
for ; p.head < p.tail; p.head++ {
entry = p.buffer[p.head & p.mod]
if entry != nil {
break
} else {
runtime.Gosched()
}
}
p.head++
return entry, nil
}
func (p *RingBuffer) persist(entry *rbentry) error {
filename := fmt.Sprintf("%s/db%.8x.dat", p.dbname, entry.index / MAX_LINES)
if fp, err := os.OpenFile(filename, os.O_CREATE | os.O_RDWR | os.O_APPEND, 0644); err != nil {
return err
} else {
var (
flag uint8 = 0
length int
nwrite int = 0
data []byte
)
writer := bufio.NewWriter(fp)
if buf, err := snappy.Encode(nil, entry.data); err != nil {
data = entry.data
} else {
flag |= ENTRY_FLAG_COMPRESSED
data = buf
}
length = len(data)
binary.Write(writer, binary.LittleEndian, uint64(entry.index))
binary.Write(writer, binary.LittleEndian, flag)
binary.Write(writer, binary.LittleEndian, uint32(length))
for nwrite < length {
if n, err := writer.Write(data[nwrite:]); err != nil { break } else { nwrite += n }
}
writer.Flush()
fp.Close()
}
return nil
}
func (p *RingBuffer) dealt(entry *rbentry) error {
filename := fmt.Sprintf("%s/db%.8x.dat", p.dbname, entry.index / MAX_LINES)
if fp, err := os.OpenFile(filename, os.O_CREATE | os.O_RDWR | os.O_APPEND, 0644); err != nil {
return err
} else {
var flag uint8 = ENTRY_FLAG_DELETED
writer := bufio.NewWriter(fp)
binary.Write(writer, binary.LittleEndian, uint64(entry.index))
binary.Write(writer, binary.LittleEndian, flag)
writer.Flush()
fp.Close()
if p.gcounter > MAX_LINES {
go p.gc()
p.gcounter = 0
} else {
p.gcounter++
}
}
return nil
}
func (p *RingBuffer) Status() string {
return fmt.Sprintf("%d/%d, head: %d, tail: %d",
p.tail - p.head, p.capacity, p.head, p.tail)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.