Skip to content

Instantly share code, notes, and snippets.

@adzimzf
Last active December 19, 2023 06:35
Show Gist options
  • Save adzimzf/ce1067e1ee1871cd2fff437ae49825ee to your computer and use it in GitHub Desktop.
Save adzimzf/ce1067e1ee1871cd2fff437ae49825ee to your computer and use it in GitHub Desktop.
A simple load testing tools, best to do a load testing for a function or chain request. It'll automatically adjust the request based on error rate
type LoadTester struct {
duration time.Duration
initialRPS int
maxErrorRate float64
maxOnGoingReq int64
rpsAdjustPeriod time.Duration
job func() error
// no need to update
currentRPS int
totalRequests int64
successful int64
failed int64
errors AtomicArray
ongoingRequests int64
totalLatency int64
highestRPS int
}
type AtomicArray struct {
m sync.Mutex
list []string
}
func (a *AtomicArray) Append(string2 string) {
a.m.Lock()
defer a.m.Unlock()
a.list = append(a.list, string2)
}
func (a *AtomicArray) Get() []string {
a.m.Lock()
defer a.m.Unlock()
return a.list
}
func (a *AtomicArray) GetGroup() map[string]int {
array := a.Get()
m := map[string]int{}
for _, s := range array {
_, ok := m[s]
if ok {
m[s]++
} else {
m[s] = 1
}
}
return m
}
func (lt *LoadTester) sendRequest() {
startTime := time.Now()
err := lt.job()
latency := time.Since(startTime)
atomic.AddInt64(&lt.totalRequests, 1)
atomic.AddInt64(&lt.totalLatency, latency.Milliseconds())
if err != nil {
atomic.AddInt64(&lt.failed, 1)
lt.errors.Append(err.Error())
} else {
atomic.AddInt64(&lt.successful, 1)
}
}
func (lt *LoadTester) adjustRPS() {
for {
time.Sleep(lt.rpsAdjustPeriod)
totalRequests := atomic.LoadInt64(&lt.totalRequests)
failed := atomic.LoadInt64(&lt.failed)
totalLatency := atomic.LoadInt64(&lt.totalLatency)
ongoingRequests := atomic.LoadInt64(&lt.ongoingRequests)
if totalRequests == 0 {
continue
}
avgLatency := time.Duration(totalLatency/totalRequests) * time.Millisecond
errorRate := float64(failed) / float64(totalRequests)
if errorRate <= lt.maxErrorRate && ongoingRequests < lt.maxOnGoingReq {
// increase by the initial RPS
lt.currentRPS += lt.initialRPS
}
// if the error rate is too big, reduce the RPS by initial RPS
if errorRate > lt.maxErrorRate && lt.currentRPS > lt.initialRPS {
lt.currentRPS = lt.highestRPS - (lt.initialRPS * 2)
}
if lt.currentRPS > lt.highestRPS {
lt.highestRPS = lt.currentRPS
}
log.Printf("total req: %d, ongoing req: %d, current RPS: %d, avg latency: %s, error rate: %f\n", lt.totalRequests, ongoingRequests, lt.currentRPS, avgLatency, errorRate)
}
}
func (lt *LoadTester) Run() {
lt.currentRPS = lt.initialRPS
endTime := time.Now().Add(lt.duration)
var wg sync.WaitGroup
go lt.adjustRPS()
for time.Now().Before(endTime) {
// to avoid system crash due to too many goroutines,
// stop the loop when the ongoing requests is too many
if lt.ongoingRequests > lt.maxOnGoingReq {
time.Sleep(time.Second)
continue
}
for i := 0; i < lt.currentRPS; i++ {
wg.Add(1)
atomic.AddInt64(&lt.ongoingRequests, 1)
go func() {
defer func() {
atomic.AddInt64(&lt.ongoingRequests, -1)
}()
defer wg.Done()
lt.sendRequest()
}()
}
time.Sleep(time.Second)
}
wg.Wait()
fmt.Printf("Total Requests: %d,\nSuccessful: %d,\nFailed: %d,\nMax RPS: %d,\n Errors: %v\n", lt.totalRequests, lt.successful, lt.failed, lt.highestRPS, lt.errors.GetGroup())
}
func TestLoadTestTool(t *testing.T) {
loadTester := LoadTester{
maxOnGoingReq: 4000,
duration: 3 * time.Minute,
maxErrorRate: 0.05, // 0.5%
initialRPS: 1000,
rpsAdjustPeriod: 5 * time.Second,
job: func() error {
_, err := http.Get("http://localhost:8080/status")
if err != nil {
return err
}
return nil
},
}
loadTester.Run()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment