Skip to content

Instantly share code, notes, and snippets.

@JnBrymn
Created July 29, 2019 22:34
Show Gist options
  • Save JnBrymn/46eea22ac1d16a0e23e57071372cabe2 to your computer and use it in GitHub Desktop.
Save JnBrymn/46eea22ac1d16a0e23e57071372cabe2 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"math/rand"
"net/http"
"os"
"strconv"
"time"
)
// Library code ////////////////////////////////////////////////////////////////////////////////////////////////////////
type Report interface{}
type WorkUnit interface {
Run() Report
}
type ReportAggregator interface {
AddReport(Report)
PeriodicReport()
FinalReport()
}
const WorkGrantToken = true // just a placeholder
type WorkRunner struct {
workUnitChan chan WorkUnit
workGrantsChan chan interface{}
reportAggregator ReportAggregator
maxConcurrentWorkUnits uint
reportingPeriod time.Duration
runTime time.Duration
}
// Creates a WorkRunner which runs WorkUnits from workUnitChan as fast as it can without exceeding
// maxConcurrentWorkUnits. Once each WorkUnit is done, it submits a Report to the provided reportAggregator.
// After the specified reportingPeriod, new jobs are not issued and the Run immediately returns.
func NewWorkRunner(
workUnitChan chan WorkUnit,
reportAggregator ReportAggregator,
maxConcurrentWorkUnits uint,
reportingPeriod time.Duration,
runTime time.Duration,
) *WorkRunner {
wr := new(WorkRunner)
wr.workUnitChan = workUnitChan
wr.reportAggregator = reportAggregator
wr.maxConcurrentWorkUnits = maxConcurrentWorkUnits
wr.reportingPeriod = reportingPeriod
wr.runTime = runTime
return wr
}
func (wr *WorkRunner) Run() {
// Create channel used to limit concurrent work. You can only run a WorkUnit if there are work grants available.
// There are only maxConcurrentWorkUnits work grants. Once work is finished, the work grant is returned to be used
// again.
wr.workGrantsChan = make(chan interface{}, wr.maxConcurrentWorkUnits)
for i := uint(0); i < wr.maxConcurrentWorkUnits; i++ {
wr.workGrantsChan <- WorkGrantToken
}
// Set up reporting tics
reportingPeriodTick := time.Tick(wr.reportingPeriod)
runFinalizationAlert := time.After(wr.runTime)
for {
select {
case <-wr.workGrantsChan:
go wr.doSingleUnitOfWork()
case <-reportingPeriodTick:
fmt.Println("Concurrent work units:", cap(wr.workGrantsChan) - len(wr.workGrantsChan))
go wr.reportAggregator.PeriodicReport()
case <-runFinalizationAlert:
wr.reportAggregator.FinalReport()
// TODO make it so that optionally we can wait till all outstanding work units are done
// OR use a channel to `close` to indicate that the job is done
return
}
}
}
func (wr *WorkRunner) doSingleUnitOfWork() {
workUnit := <-wr.workUnitChan
report := workUnit.Run()
wr.reportAggregator.AddReport(report)
wr.workGrantsChan <- WorkGrantToken
}
// Application code/////////////////////////////////////////////////////////////////////////////////////////////////////
type myReport struct{
success bool
latency time.Duration
}
type myWorkUnit struct{
mode string
}
func (wu *myWorkUnit) Run() Report {
startTs := time.Now()
switch wu.mode {
case SLEEP:
sleepRandTime()
case HTTP_GET:
getNonExistentWebsite()
case READ_FILE:
readRandBytes()
}
return &myReport{
success: rand.Float64() > 0.10,
latency: time.Now().Sub(startTs),
}
}
func sleepRandTime() {
sleepTime := time.Duration(rand.Float64()*float64(time.Second))
time.Sleep(sleepTime)
}
func readRandBytes() {
f, _ := os.Open("/dev/random")
b1 := make([]byte, 10)
for i := 0; i < 10; i++ {
f.Read(b1)
}
f.Close()
}
func getNonExistentWebsite() {
url := "http://" + strconv.Itoa(int(rand.Int31())) + ".com/"
resp, err := http.Get(url)
fmt.Println(resp, err)
}
type myReportAggregator struct {
numCompletedWorkUnits int
numSuccesses int
totalWait time.Duration
startTs time.Time
}
func NewMyReportAggregator() ReportAggregator {
ra := &myReportAggregator{}
ra.startTs = time.Now()
return ra
}
func (ra *myReportAggregator) AddReport(report Report) {
myReport := report.(*myReport)
// TODO add sync.Lock
// OR ACTUALLY, there's a way to use channels in WorkRunner such that ReportAggregator keeps the current simple
// interface, but you man make sure that the info isn't updated concurrently and we don't have to make it blocking
// (e.g. no sync.Lock)
ra.numCompletedWorkUnits++
if myReport.success {
ra.numSuccesses++
}
ra.totalWait += myReport.latency
}
func (ra *myReportAggregator) PeriodicReport() {
//TODO add sync.Lock
fmt.Printf(
"average latency: %v\nfail rate: %v\n\n",
time.Duration(float64(ra.totalWait)/float64(ra.numCompletedWorkUnits)),
1 - float64(ra.numSuccesses)/float64(ra.numCompletedWorkUnits),
)
}
func (ra *myReportAggregator) FinalReport() {
duration := time.Now().Sub(ra.startTs)
fmt.Println("total completed: ", ra.numCompletedWorkUnits)
fmt.Println("total time: ", duration)
fmt.Println("units per second: ", ra.numCompletedWorkUnits / int(duration.Seconds()))
}
func getWorkChan(mode string) chan WorkUnit {
workChan := make(chan WorkUnit)
go func() {
for {
workChan <-&myWorkUnit{mode}
}
}()
return workChan
}
const SLEEP = "sleep"
const HTTP_GET = "httpGet"
const READ_FILE = "readFile"
func main() {
rand.Seed(int64(time.Now().Unix()))
workChan := getWorkChan(HTTP_GET)
maxConcurrentWorkUnits := uint(250000)
//maxConcurrentWorkUnits := uint(10)
reportingPeriod := time.Second
runTime := time.Second * 15
workRunner := NewWorkRunner(
workChan,
NewMyReportAggregator(),
maxConcurrentWorkUnits,
reportingPeriod,
runTime,
)
workRunner.Run()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment