Skip to content

Instantly share code, notes, and snippets.

@zacbrown
Last active December 21, 2017 05:25
Show Gist options
  • Save zacbrown/2b9ba12142e3adb6b0c1eed7c295846b to your computer and use it in GitHub Desktop.
Save zacbrown/2b9ba12142e3adb6b0c1eed7c295846b to your computer and use it in GitHub Desktop.
golang synthetic event pumping
/*
Compilation & execution instructions:
go get "github.com/satori/go.uuid"
go build
.\<binary name> -cpuprofile cpu.prof -memprofile mem.prof -resultcount 1000000
The -resultcount flag indicates how many elements to hold before
clearing the collection. This is mainly useful in observing the overhead
of a collection of objects.
*/
package main
import (
"flag"
"fmt"
"github.com/satori/go.uuid"
"log"
"os"
"runtime"
"runtime/pprof"
"time"
)
type TcpEvent struct {
Saddr int
Daddr int
Sport int
Dport int
Timestamp time.Time
ProviderGuid uuid.UUID
ProviderName string
EventId int
EventName string
Level int
Flags int
Version int
}
func producer(totalevents int64, ch chan<- TcpEvent, done chan<- bool) {
var ii int64
for ii = 0; ii <= totalevents; ii++ {
e := TcpEvent{
12345,
67890,
12,
34,
time.Now(),
uuid.NewV1(),
"Tater-Salad-City",
69,
"Bad Tater",
0,
0,
0}
ch <- e
}
done <- true
}
func consumer(id int, ch <-chan TcpEvent, results chan<- TcpEvent) {
var count int64
count = 0
for e := range ch {
count++
if count%100000 == 0 {
fmt.Printf("[%d] Processed %d events\n", id, count)
}
a := TcpEvent{}
a.Daddr = e.Daddr
a.Saddr = e.Saddr
a.Dport = e.Dport
a.Sport = e.Sport
a.EventId = e.EventId
a.EventName = e.EventName
a.Flags = e.Flags
a.Version = e.Version
a.Level = e.Level
a.ProviderGuid = e.ProviderGuid
a.ProviderName = e.ProviderName
a.Timestamp = e.Timestamp
results <- a
}
fmt.Printf("[%d] Done processing events. Total processed: %d\n", id, count)
}
func resultHandler(ch <-chan TcpEvent, clearCount int64) {
var results []TcpEvent
var count int64
count = 0
for e := range ch {
count++
results = append(results, e)
if count > clearCount {
fmt.Printf("[result handler] Handled %d events, clearing...\n", count)
results = results[:0]
count = 0
}
}
}
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile `file`")
var memprofile = flag.String("memprofile", "", "write memory profile to `file`")
var resultcount = flag.Int64("resultcount", 100000, "count of elements to hold before clearing")
var totalevents = flag.Int64("totalevents", 1000000000, "total number of events to pump")
func main() {
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
}
pool := make(chan TcpEvent, 3000)
done := make(chan bool)
results := make(chan TcpEvent, 10000)
defer close(done)
defer close(pool)
defer close(results)
go resultHandler(results, *resultcount)
go producer(*totalevents, pool, done)
for ii := 1; ii <= 10; ii++ {
go consumer(ii, pool, results)
}
<-done
for len(pool) > 0 {
fmt.Println("Waiting on consumers...")
time.Sleep(time.Second * 2)
}
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
f.Close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment