Skip to content

Instantly share code, notes, and snippets.

@olegfedoseev
Created April 8, 2013 06:34
Show Gist options
  • Save olegfedoseev/5334690 to your computer and use it in GitHub Desktop.
Save olegfedoseev/5334690 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"time"
"runtime"
"compress/zlib"
"io"
"os"
"os/signal"
"bytes"
"sort"
"encoding/json"
zmq "github.com/alecthomas/gozmq"
)
// Data straight from pinba2zmq
type RawRequest struct {
hostname string
server string
script string
time float64
cpu_time float64
timers []Timer
}
type Message struct {
ts int
requests []RawRequest
}
type Timer struct{
tags map[string]string
hits float64
time float64
}
type GroupedData struct {
tags map[string]string
count int
time Time
cpu_time Time
}
type Time struct {
med float64
max float64
avg float64
p85 float64
dev float64
}
func parse_json(raw interface{}) (requests []RawRequest) {
for _, r := range raw.([]interface{}) {
m := r.(map[string]interface{})
request := RawRequest{
hostname: m["hostname"].(string), server: m["server"].(string), script: m["script"].(string),
time: m["time"].(float64), cpu_time: m["ru_utime"].(float64) + m["ru_stime"].(float64),
}
for _, val := range m["timers"].([]interface{}) {
t := val.(map[string]interface{})
timer := Timer{
hits: t["hits"].(float64), time: t["time"].(float64), tags: map[string]string{},
}
for tag_name, tag_value := range t["tags"].(map[string]interface{}) {
timer.tags[tag_name] = tag_value.(string)
}
request.timers = append(request.timers, timer)
}
requests = append(requests, request)
}
return
}
func group_requests(requests *[]RawRequest) (result []GroupedData) {
tags := make(map[string]map[string]string)
time := make(map[string][]float64)
cpu_time := make(map[string][]float64)
for _, r := range *requests {
uniqid := r.hostname + r.server + r.script
tags[uniqid] = map[string]string{"host": r.hostname, "server": r.server, "script": r.script}
time[uniqid] = append(time[uniqid], r.time)
cpu_time[uniqid] = append(time[uniqid], r.cpu_time)
}
for uniqid, vals := range tags {
gd := GroupedData{
tags: vals,
count: len(time[uniqid]),
time: aggregate(time[uniqid]),
cpu_time: aggregate(cpu_time[uniqid]),
}
result = append(result, gd)
}
return
}
func group_timers(requests *[]RawRequest) (result []GroupedData) {
tags := make(map[string]map[string]string)
time := make(map[string][]float64)
hits := make(map[string][]float64)
for _, r := range *requests {
for _, t := range r.timers {
uniqid := r.hostname + r.server + r.script
keys := make([]string, len(t.tags))
i := 0
for k, _ := range t.tags {
keys[i] = k
i++
}
sort.Strings(keys)
for _, key := range keys {
uniqid += key + t.tags[key]
}
tags[uniqid] = map[string]string{"host": r.hostname, "server": r.server, "script": r.script}
for _, key := range keys {
tags[uniqid][key] = t.tags[key]
}
time[uniqid] = append(time[uniqid], t.time)
hits[uniqid] = append(hits[uniqid], float64(t.hits))
}
}
for uniqid, vals := range tags {
gd := GroupedData{
tags: vals,
count: int(sum(hits[uniqid])),
time: aggregate(time[uniqid]),
}
result = append(result, gd)
}
return
}
func aggregate(values []float64) Time {
if len(values) == 0 {
return Time{}
}
sort.Float64s(values)
idx := int(0.15 * float64(len(values)))
return Time{
med: values[len(values) / 2],
max: values[len(values) - 1],
avg: sum(values) / float64(len(values)),
p85: sum(values[idx:]) / float64(len(values[idx:])),
}
}
func sum(values []float64) (sum float64) {
for _, value := range values {
sum += value
}
return
}
func listenForSignals(exit_channel chan bool) {
signal_channel := make(chan os.Signal)
signal.Notify(signal_channel)
<- signal_channel
fmt.Println("stopping")
exit_channel <- true
}
func read_raw(content *[]byte) (int, []RawRequest){
var out bytes.Buffer
r, _ := zlib.NewReader(io.Reader(bytes.NewBuffer(*content)))
io.Copy(&out, r)
r.Close()
var in [2]interface{}
json.Unmarshal(out.Bytes(), &in)
// Все запросы за секунду, без группировок
return int(in[0].(float64)), parse_json(in[1])
}
func collector(interval int, start_time time.Time, in <-chan Message) {
count := 0
buffer := []RawRequest{}
//start_time.Seconds()
for {
message := <- in
buffer = append(buffer, message.requests...)
count += 1
if count == interval {
go func (buffer []RawRequest) {
t := time.Now()
// Сгруппированные запросы за секунду
requests := group_requests(&buffer)
// Сгруппированные таймеры за секунду
timers := group_timers(&buffer)
//fmt.Printf("%v\n", len(content))
fmt.Printf("PROCESSED [%v] RawRequests: %v (requests: %v, timers: %v) in %v\n",
message.ts, len(buffer), len(requests), len(timers), time.Now().Sub(t))
}(buffer)
buffer = []RawRequest{}
count = 0
}
fmt.Printf("interval: %v, ts: %v, len(requests): %v, len(buffer): %v\n", interval, message.ts, len(message.requests), len(buffer))
}
}
func main() {
fmt.Println("Boo!")
runtime.GOMAXPROCS(runtime.NumCPU())
exit := make(chan bool)
exit_signal := false
go listenForSignals(exit)
context, _ := zmq.NewContext()
defer context.Close()
subscriber, _ := context.NewSocket(zmq.SUB)
defer subscriber.Close()
subscriber.Connect("tcp://172.16.5.130:5000")
subscriber.SetSockOptString(zmq.SUBSCRIBE, "")
data := make(chan []byte)
errors := make(chan error)
go func() {
for {
// Получение исходных данных от pinba2zmq
msgbytes, err := subscriber.Recv(0)
if err != nil {
errors <- err
} else {
data <- msgbytes
}
}
}()
// Хочу получать данные сгрупированные по секундно, по 10 секунд и по минутно
intervals := [3]int{1, 10, 60}
var channels map[int] chan Message = make(map[int] chan Message)
for _, interval := range intervals {
channels[interval] = make(chan Message, 1)
go collector(interval, time.Now(), channels[interval])
}
for exit_signal == false {
select {
case exit_signal = <- exit:
fmt.Println("W: interrupt received, killing server...")
case err := <- errors:
fmt.Println("Receive Error:", err.Error())
case msgbytes := <- data:
t := time.Now()
ts, raw_requests := read_raw(&msgbytes)
m := Message{ts: ts, requests: raw_requests}
for _, interval := range intervals {
channels[interval] <- m
}
fmt.Printf("RECV [%v] RawRequests: %v in %v\n", ts, len(raw_requests), time.Now().Sub(t))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment