Created
March 27, 2019 05:09
-
-
Save gerhardqux/2a21bd4eb700cbf2242089a7052ec183 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Funneld - Receive sales events and store accumelated data periodically | |
// | |
// This daemon receives realtime sales events and stores them in memory. | |
// Periodically, like hourly/daily/weekly, the accumelated data is | |
// flushed to a json file which is stored in a directory based on | |
// the filestamp. | |
// | |
// E.g. 2014/koekjes.json | |
// 2014/w52/koekjes.json | |
// 2014/04/koekjes.json | |
// 2014/04/28/koekjes.json | |
// 2014/04/28/23/koekjes.json | |
// | |
// These files can be made accessible by a webserver to allow for | |
// external graphing. | |
// | |
// Funneld is based on go-statsd, but stores to json | |
// | |
// To Do | |
// - create json using jsonlib | |
// - pcre | |
// - separate file for policy | |
// - gflag | |
// - objectstore | |
package main | |
import ( | |
"bytes" | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net" | |
"os" | |
"regexp" | |
"strconv" | |
"time" | |
) | |
type Packet struct { | |
Bucket string | |
Value int | |
} | |
var ( | |
serviceAddress = flag.String("p", ":8125", "port to listen on") | |
publishDir = flag.String("o", "htdocs", "outdir, where to publish json files") | |
flushInterval = flag.Int64("i", 5, "Flush interval") | |
) | |
// Globals, when packets are processed, they are stored in the In-buffer | |
// queue up a maximum of 10000 packets while writing jsons, then | |
// have the kernel queue up udp messages | |
var ( | |
In = make(chan Packet, 10000) | |
counters = make(map[string]int) | |
) | |
func monitor() { | |
var err error | |
if err != nil { | |
log.Println(err) | |
} | |
t := time.NewTicker( | |
time.Duration(*flushInterval) * time.Second, | |
) | |
for { | |
select { | |
case <-t.C: | |
submit() | |
case s := <-In: | |
_, ok := counters[s.Bucket] | |
if !ok { | |
counters[s.Bucket] = 0 | |
} | |
counters[s.Bucket] += int(s.Value) | |
} | |
} | |
} | |
func submit() { | |
// mkdir htdocs/2014/04/31 | |
dir := fmt.Sprintf( | |
"%s/%04d/%02d/%02d", | |
*publishDir, | |
time.Now().Year(), | |
time.Now().Month(), | |
time.Now().Day(), | |
) | |
err := os.MkdirAll(dir, 0755) | |
if err != nil { | |
log.Printf(err.Error()) | |
} | |
// build json: { "Answer": "42" } | |
buffer := bytes.NewBufferString("") | |
fmt.Fprintf(buffer, "{\n") | |
for s, c := range counters { | |
fmt.Fprintf(buffer, "\t\"%s\": \"%d\",\n", s, c) | |
counters[s] = 0 | |
} | |
fmt.Fprintf(buffer, "\t\"Answer\": \"42\"\n") | |
fmt.Fprintf(buffer, "}\n") | |
// write the json file | |
fname := fmt.Sprintf("%s/%s", dir, "koekjes.json") | |
err = ioutil.WriteFile(fname, buffer.Bytes(), 0644) | |
if err != nil { | |
log.Printf(err.Error()) | |
} | |
} | |
func handleMessage(conn *net.UDPConn, remaddr net.Addr, buf *bytes.Buffer) { | |
var packet Packet | |
var sanitizeRegexp = regexp.MustCompile("[^a-zA-Z0-9\\-_\\.:\\|@]") | |
var packetRegexp = regexp.MustCompile("([a-zA-Z0-9_\\.]+):([0-9]+)\\|(c)(\\|@([0-9\\.]+))?") | |
s := sanitizeRegexp.ReplaceAllString(buf.String(), "") | |
for _, item := range packetRegexp.FindAllStringSubmatch(s, -1) { | |
value, err := strconv.Atoi(item[2]) | |
if err != nil { | |
value = 1 | |
} | |
packet.Bucket = item[1] | |
packet.Value = value | |
In <- packet | |
} | |
} | |
func udpListener() { | |
address, _ := net.ResolveUDPAddr("udp", *serviceAddress) | |
listener, err := net.ListenUDP("udp", address) | |
if err != nil { | |
log.Fatalf("ListenAndServe: %s", err.Error()) | |
} | |
for { | |
message := make([]byte, 512) | |
n, remaddr, error := listener.ReadFrom(message) | |
if error != nil { | |
continue | |
} | |
buf := bytes.NewBuffer(message[0:n]) | |
handleMessage(listener, remaddr, buf) | |
} | |
listener.Close() | |
} | |
func main() { | |
flag.Parse() | |
go udpListener() | |
monitor() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment