Skip to content

Instantly share code, notes, and snippets.

@gerhardqux
Created March 27, 2019 05:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gerhardqux/2a21bd4eb700cbf2242089a7052ec183 to your computer and use it in GitHub Desktop.
Save gerhardqux/2a21bd4eb700cbf2242089a7052ec183 to your computer and use it in GitHub Desktop.
// Funneld - Receive sales events and store accumelated data periodically
//
// This daemon receives realtime sales events and stores them in memory.
// Periodically, like hourly/daily/weekly, the accumelated data is
// flushed to a json file which is stored in a directory based on
// the filestamp.
//
// E.g. 2014/koekjes.json
// 2014/w52/koekjes.json
// 2014/04/koekjes.json
// 2014/04/28/koekjes.json
// 2014/04/28/23/koekjes.json
//
// These files can be made accessible by a webserver to allow for
// external graphing.
//
// Funneld is based on go-statsd, but stores to json
//
// To Do
// - create json using jsonlib
// - pcre
// - separate file for policy
// - gflag
// - objectstore
package main
import (
"bytes"
"flag"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"regexp"
"strconv"
"time"
)
type Packet struct {
Bucket string
Value int
}
var (
serviceAddress = flag.String("p", ":8125", "port to listen on")
publishDir = flag.String("o", "htdocs", "outdir, where to publish json files")
flushInterval = flag.Int64("i", 5, "Flush interval")
)
// Globals, when packets are processed, they are stored in the In-buffer
// queue up a maximum of 10000 packets while writing jsons, then
// have the kernel queue up udp messages
var (
In = make(chan Packet, 10000)
counters = make(map[string]int)
)
func monitor() {
var err error
if err != nil {
log.Println(err)
}
t := time.NewTicker(
time.Duration(*flushInterval) * time.Second,
)
for {
select {
case <-t.C:
submit()
case s := <-In:
_, ok := counters[s.Bucket]
if !ok {
counters[s.Bucket] = 0
}
counters[s.Bucket] += int(s.Value)
}
}
}
func submit() {
// mkdir htdocs/2014/04/31
dir := fmt.Sprintf(
"%s/%04d/%02d/%02d",
*publishDir,
time.Now().Year(),
time.Now().Month(),
time.Now().Day(),
)
err := os.MkdirAll(dir, 0755)
if err != nil {
log.Printf(err.Error())
}
// build json: { "Answer": "42" }
buffer := bytes.NewBufferString("")
fmt.Fprintf(buffer, "{\n")
for s, c := range counters {
fmt.Fprintf(buffer, "\t\"%s\": \"%d\",\n", s, c)
counters[s] = 0
}
fmt.Fprintf(buffer, "\t\"Answer\": \"42\"\n")
fmt.Fprintf(buffer, "}\n")
// write the json file
fname := fmt.Sprintf("%s/%s", dir, "koekjes.json")
err = ioutil.WriteFile(fname, buffer.Bytes(), 0644)
if err != nil {
log.Printf(err.Error())
}
}
func handleMessage(conn *net.UDPConn, remaddr net.Addr, buf *bytes.Buffer) {
var packet Packet
var sanitizeRegexp = regexp.MustCompile("[^a-zA-Z0-9\\-_\\.:\\|@]")
var packetRegexp = regexp.MustCompile("([a-zA-Z0-9_\\.]+):([0-9]+)\\|(c)(\\|@([0-9\\.]+))?")
s := sanitizeRegexp.ReplaceAllString(buf.String(), "")
for _, item := range packetRegexp.FindAllStringSubmatch(s, -1) {
value, err := strconv.Atoi(item[2])
if err != nil {
value = 1
}
packet.Bucket = item[1]
packet.Value = value
In <- packet
}
}
func udpListener() {
address, _ := net.ResolveUDPAddr("udp", *serviceAddress)
listener, err := net.ListenUDP("udp", address)
if err != nil {
log.Fatalf("ListenAndServe: %s", err.Error())
}
for {
message := make([]byte, 512)
n, remaddr, error := listener.ReadFrom(message)
if error != nil {
continue
}
buf := bytes.NewBuffer(message[0:n])
handleMessage(listener, remaddr, buf)
}
listener.Close()
}
func main() {
flag.Parse()
go udpListener()
monitor()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment