Skip to content

Instantly share code, notes, and snippets.

@johncoder
Created January 17, 2018 15:59
Show Gist options
  • Save johncoder/25f42c1b1abdd16bedc6085602ecb678 to your computer and use it in GitHub Desktop.
Save johncoder/25f42c1b1abdd16bedc6085602ecb678 to your computer and use it in GitHub Desktop.
event store in go
package main
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"time"
"unsafe"
)
// TODO(john): Compression
// TODO(john): Encryption
// TODO(john): Content-types?
// TODO(john): Network interface
// TODO(john): server design ideas: 1) use a goroutine to synchronize writes to a stream, ensuring optimistic concurrency; 2) use a single goroutine to achieve a single writer, fanout/fanin
// TODO(john): figure out how to properly update Next pointer of an atom
// NOTE(john): This approach will probably require some kind of cache or "head" file to optimize startup time. It'll contain a byte position reference to the latest event in a particular stream. This can utilize the same format as the normal log file.
// NOTE(john): Sharding seems doable, based on stream Id?
// NOTE(john): Obscure thought, but what if there were three files: epochs, events, heads?
type (
Atom struct {
Type uint8
StreamId int64
DateTime [len(EventDateTimeFormat)]uint8
Length int64
Next int64
SequenceId int64
EventType uint16
ContentType uint8
Value []byte
}
AtomMarker struct {
StreamId int64
SequenceId int64
Next *AtomMarker
Prev *AtomMarker
NextOrthogonal *AtomMarker
PrevOrthogonal *AtomMarker
Value *Atom
}
WriteAtomResult struct {
Success bool
Fail bool
SequenceId int64
}
WriteAtom struct {
Reply chan WriteAtomResult
Atom Atom
}
AtomResult struct {
DateTime string
StreamId int64
SequenceId int64
EventType uint16
ContentType string
Value string
}
)
// TODO(john): Tighten up the idea of atom types (lists of atoms?)
const (
AtomType int8 = iota
ListType = iota
LiteralType = iota
EventDateTimeFormat = "2006-01-02T03:04:05Z" // based on time.RFC3339
)
var (
tail bool
follow bool
port string
logFileName string
AtomHeaderByteLength int
LastAtom *AtomMarker
Streams = make(map[int64]*AtomMarker)
Endianness = binary.LittleEndian
ContentTypes = map[string]uint8{
"binary": 0,
"text/plain": 1,
"application/json": 2,
"application/xml": 3,
}
ContentTypesById = map[uint8]string{
0: "binary",
1: "text/plain",
2: "application/json",
3: "application/xml",
}
)
func sumSizes(nums ...int) int {
result := 0
for _, v := range nums {
result += v
}
return result
}
// TODO(john): add short versions of each option
// TODO(john): consider using a configuration file
func init() {
flag.BoolVar(&tail, "t", false, "Read the file and print each atom value")
flag.BoolVar(&tail, "tail", false, "Read the file and print each atom value")
flag.BoolVar(&follow, "f", false, "Continue reading the file")
flag.BoolVar(&follow, "follow", false, "Continue reading the file")
flag.StringVar(&port, "p", "", "Port to listen on a web server")
flag.StringVar(&port, "port", "", "Port to listen on a web server")
flag.StringVar(&logFileName, "o", "./test.log", "Log file name")
flag.StringVar(&logFileName, "output", "./test.log", "Log file name")
flag.Parse()
a := Atom{}
AtomHeaderByteLength = sumSizes(
int(unsafe.Sizeof(a.Type)),
int(unsafe.Sizeof(a.StreamId)),
int(unsafe.Sizeof(a.DateTime)),
int(unsafe.Sizeof(a.Length)),
int(unsafe.Sizeof(a.Next)),
int(unsafe.Sizeof(a.SequenceId)),
int(unsafe.Sizeof(a.EventType)),
int(unsafe.Sizeof(a.ContentType)),
)
}
// TODO(john): Make this return an error object
func readAtom(f *os.File) (atom *Atom, totalBytesRead int) {
headerBytes := make([]byte, AtomHeaderByteLength)
var err error
totalBytesRead, err = f.Read(headerBytes)
if err != nil {
// TODO(john): Return the error here
return nil, totalBytesRead
}
headerBuffer := bytes.NewBuffer(headerBytes)
atom = &Atom{}
binary.Read(headerBuffer, Endianness, &atom.Type)
binary.Read(headerBuffer, Endianness, &atom.StreamId)
binary.Read(headerBuffer, Endianness, &atom.DateTime)
binary.Read(headerBuffer, Endianness, &atom.Length)
binary.Read(headerBuffer, Endianness, &atom.Next)
binary.Read(headerBuffer, Endianness, &atom.SequenceId)
binary.Read(headerBuffer, Endianness, &atom.EventType)
binary.Read(headerBuffer, Endianness, &atom.ContentType)
valueBytes := make([]byte, atom.Length)
bytesRead, err := f.Read(valueBytes)
if err != nil {
return nil, totalBytesRead
}
totalBytesRead += bytesRead
atom.Value = valueBytes
return atom, totalBytesRead
}
// NOTE(john): This is the function that updates the in-memory data
// structure of atoms. This affects what the http handler returns when
// querying events.
func applyAtom(atom *Atom) {
latestAtom := Streams[atom.StreamId]
if latestAtom == nil {
latestAtom = &AtomMarker{
StreamId: atom.StreamId,
SequenceId: atom.SequenceId,
Value: atom,
}
Streams[atom.StreamId] = latestAtom
} else {
if latestAtom.SequenceId < atom.SequenceId {
newAtom := &AtomMarker{
StreamId: atom.StreamId,
SequenceId: atom.SequenceId,
Prev: latestAtom,
Value: atom,
}
latestAtom.Next = newAtom
Streams[atom.StreamId] = newAtom
latestAtom = newAtom
}
}
if latestAtom != LastAtom {
latestAtom.PrevOrthogonal = LastAtom
}
if LastAtom != nil {
LastAtom.NextOrthogonal = latestAtom
}
LastAtom = latestAtom
}
func apply(done chan int) func(chan Atom) {
return func(queue chan Atom) {
for {
select {
case atom := <-queue:
applyAtom(&atom)
case <-done:
return
}
}
}
}
// TODO(john): This function needs to return an error so that it may
// be properly dealt with upstream
func appendEntry(a *Atom, f io.Writer) {
headerBuffer := new(bytes.Buffer)
// NOTE(john): Ignoring errors here for now...
binary.Write(headerBuffer, Endianness, a.Type)
binary.Write(headerBuffer, Endianness, a.StreamId)
binary.Write(headerBuffer, Endianness, a.DateTime)
binary.Write(headerBuffer, Endianness, a.Length)
binary.Write(headerBuffer, Endianness, a.Next)
binary.Write(headerBuffer, Endianness, a.SequenceId)
binary.Write(headerBuffer, Endianness, a.EventType)
binary.Write(headerBuffer, Endianness, a.ContentType)
// TODO(john): Combine this into a single write.
f.Write(headerBuffer.Bytes())
f.Write(a.Value)
}
func startReader(logFileName string, handle func(*Atom)) {
logFile, _ := os.OpenFile(logFileName, os.O_RDONLY|os.O_CREATE, 0777)
defer logFile.Close()
position := int64(0)
for {
logFile.Seek(int64(position), 0)
atom, bytesRead := readAtom(logFile)
if !follow && bytesRead == 0 {
break
}
if bytesRead != 0 && atom != nil {
handle(atom)
position += int64(bytesRead)
} else {
time.Sleep(200 * time.Millisecond)
}
}
}
func writerReplyer(done chan int, f *os.File, queue chan WriteAtom) {
for {
select {
case item := <-queue:
stream := Streams[item.Atom.StreamId]
sequenceId := int64(0)
if stream != nil {
sequenceId = stream.SequenceId
}
if item.Atom.SequenceId == sequenceId {
sequenceId = sequenceId + int64(1)
item.Atom.SequenceId = sequenceId
appendEntry(&item.Atom, f)
applyAtom(&item.Atom)
item.Reply <- WriteAtomResult{Success: true, SequenceId: sequenceId}
} else {
result := WriteAtomResult{Fail: true, SequenceId: sequenceId}
item.Reply <- result
}
case <-done:
return
}
}
}
func writer(done chan int, f *os.File, queue chan Atom) {
for {
select {
case atom := <-queue:
appendEntry(&atom, f)
case <-done:
return
}
}
}
func getEventDateTime(t time.Time) [len(EventDateTimeFormat)]uint8 {
result := [len(EventDateTimeFormat)]uint8{}
tf := t.Format(EventDateTimeFormat)
for i, v := range tf {
result[i] = uint8(v)
}
return result
}
func readEventDateTime(input [len(EventDateTimeFormat)]uint8) (time.Time, error) {
var intermediate string
for _, v := range input {
intermediate += string(rune(v))
}
return time.Parse(EventDateTimeFormat, intermediate)
}
func startWriter(logFileName string) {
logFile, _ := os.OpenFile(logFileName, os.O_APPEND|os.O_CREATE, 0755)
defer logFile.Close()
streamId := int64(1)
atomType := uint8(1)
sequenceId := int64(0)
// TODO(john): Manage the next pointers! Thought: if an in-memory
// map is maintained, is it feasible to f.Seek to the next pointer
// in the log file and simply overwrite it?
done := make(chan int)
queue := make(chan Atom)
go writer(done, logFile, queue)
for {
reader := bufio.NewReader(os.Stdin)
fmt.Print("Enter text: ")
text, _ := reader.ReadString('\n')
if strings.TrimSpace(text) == "" {
done <- 1
fmt.Println("All done!?")
break
}
sequenceId += int64(1)
value := bytes.NewBufferString(strings.TrimSpace(text)).Bytes()
event := Atom{
Type: atomType,
StreamId: streamId,
DateTime: getEventDateTime(time.Now().UTC()),
Length: int64(len(value)),
Next: 0,
SequenceId: sequenceId,
Value: value,
}
queue <- event
}
}
func atomResult(a *Atom) *AtomResult {
dt, _ := readEventDateTime(a.DateTime)
result := &AtomResult{
DateTime: dt.Format(EventDateTimeFormat),
StreamId: a.StreamId,
SequenceId: a.SequenceId,
EventType: a.EventType,
ContentType: ContentTypesById[a.ContentType],
Value: bytes.NewBuffer(a.Value).String(),
}
return result
}
func getAtomArray(a *AtomMarker) []*Atom {
atoms := make([]*Atom, 0)
currentAtom := a
for currentAtom != nil {
atoms = append(atoms, currentAtom.Value)
if currentAtom != currentAtom.PrevOrthogonal {
currentAtom = currentAtom.PrevOrthogonal
} else {
currentAtom = nil
}
}
return atoms
}
func getAtomResultsArray(a *AtomMarker) []*AtomResult {
atoms := make([]*AtomResult, 0)
currentAtom := a
for currentAtom != nil {
atoms = append(atoms, atomResult(currentAtom.Value))
if currentAtom != currentAtom.PrevOrthogonal {
currentAtom = currentAtom.PrevOrthogonal
} else {
currentAtom = nil
}
}
return atoms
}
func httpGETRoot(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
return
}
atoms := getAtomResultsArray(LastAtom)
events := new(bytes.Buffer)
jsonAtom, _ := json.Marshal(atoms[:len(atoms)])
events.Write(jsonAtom)
w.Write(events.Bytes())
}
func httpGETStreams(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/streams" {
return
}
atoms := make([]*AtomResult, 0)
for _, currentAtom := range Streams {
if currentAtom != nil {
atoms = append(atoms, atomResult(currentAtom.Value))
}
}
events := new(bytes.Buffer)
jsonAtom, _ := json.Marshal(atoms[:len(atoms)])
events.Write(jsonAtom)
w.Write(events.Bytes())
}
func httpGETStream(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
return
}
streamId, strConvErr := strconv.ParseInt(r.URL.Path[1:], 10, 64)
if strConvErr != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Bad Request")
return
}
currentAtom := Streams[streamId]
atoms := getAtomResultsArray(currentAtom)
events := new(bytes.Buffer)
jsonAtom, _ := json.Marshal(atoms[:len(atoms)])
events.Write(jsonAtom)
fmt.Fprintf(w, "%s", events.Bytes())
}
func httpWriteEvent(f *os.File, done chan int) func(http.ResponseWriter, *http.Request) {
queue := make(chan WriteAtom, 100)
atomType := uint8(1)
go writerReplyer(done, f, queue)
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
if r.URL.Path == "/" {
httpGETRoot(w, r)
return
}
httpGETStream(w, r)
return
}
if r.Method != "PUT" && r.Method != "POST" {
return
}
query := r.URL.Query()
streamId, strConvErr := strconv.ParseInt(r.URL.Path[1:], 10, 64)
sequenceId, seqParseErr := strconv.ParseInt(query.Get("current"), 10, 64)
eventType, etParseErr := strconv.ParseUint(query.Get("type"), 10, 16)
fmt.Printf("%s %s %d %d %d\n", r.Method, r.URL, streamId, sequenceId, eventType)
if strConvErr != nil || seqParseErr != nil || etParseErr != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Bad Request")
return
}
// TODO(john): Handle this error!
value, _ := ioutil.ReadAll(r.Body)
atom := Atom{
Type: atomType,
StreamId: streamId,
DateTime: getEventDateTime(time.Now().UTC()),
Length: int64(len(value)),
// TODO(john): Manage the next pointers!
Next: 0,
SequenceId: sequenceId,
EventType: uint16(eventType),
ContentType: ContentTypes[r.Header.Get("Content-Type")],
Value: value,
}
// TODO(john): This should be the epoch event
if r.Method == "PUT" {
reply := make(chan WriteAtomResult)
queue <- WriteAtom{Atom: atom, Reply: reply}
if result := <-reply; result.Success {
fmt.Fprintf(w, "INCR %d %d->%d", streamId, sequenceId, result.SequenceId)
} else {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Bad Request")
}
} else if r.Method == "POST" {
reply := make(chan WriteAtomResult)
queue <- WriteAtom{Atom: atom, Reply: reply}
if result := <-reply; result.Success {
fmt.Fprintf(w, "INCR %d %d->%d", streamId, sequenceId, result.SequenceId)
} else {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Bad Request")
}
}
}
}
func startWebServer() {
logFile, _ := os.OpenFile(logFileName, os.O_APPEND|os.O_CREATE, 0777)
defer logFile.Close()
fmt.Println("Listening on port", port)
done := make(chan int)
writeEvent := httpWriteEvent(logFile, done)
http.HandleFunc("/", writeEvent)
http.HandleFunc("/streams", httpGETStreams)
http.ListenAndServe(":"+port, nil)
for {
reader := bufio.NewReader(os.Stdin)
fmt.Print("Press any key to stop")
reader.ReadString('\n')
done <- 1
}
}
func printAtomValue(a *Atom) {
dt, _ := readEventDateTime(a.DateTime)
fmt.Printf("[%s] %d/%d: %s\n", dt.Format(EventDateTimeFormat), a.StreamId, a.SequenceId, a.Value)
}
func main() {
fmt.Println(time.Now().UTC().Format(EventDateTimeFormat), "Starting Pylon...")
fmt.Printf("Atom Header Byte Length: %d\n", AtomHeaderByteLength)
if tail {
fmt.Println(time.Now().UTC().Format(EventDateTimeFormat), "Tailing...")
startReader(logFileName, printAtomValue)
} else if port != "" {
doneReading := make(chan int)
readAtoms := make(chan Atom)
go apply(doneReading)(readAtoms)
handle := func(a *Atom) { readAtoms <- *a }
go startReader(logFileName, handle)
startWebServer()
doneReading <- 1
} else {
fmt.Println(time.Now().UTC().Format(EventDateTimeFormat), "Writing...")
startWriter(logFileName)
}
fmt.Println(time.Now().UTC().Format(EventDateTimeFormat), "Shutting down...")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment