Skip to content

Instantly share code, notes, and snippets.

@sergi
Last active August 16, 2018 12:08
Show Gist options
  • Save sergi/9805c9aa1ceb9d06adad3e55dc041374 to your computer and use it in GitHub Desktop.
Save sergi/9805c9aa1ceb9d06adad3e55dc041374 to your computer and use it in GitHub Desktop.
Goroutine concurrency blog post
func AverageLatency(host string) (latency int64, err error) {
CONCURRENCY := 4
REQUESTS_LIMIT := 100
dnsRequests := make(chan int, REQUESTS_LIMIT)
results := make(chan int64, REQUESTS_LIMIT)
errorsResults := make(chan string, REQUESTS_LIMIT)
for w := 1; w <= CONCURRENCY; w++ {
go dnsTest(dnsRequests, results, errorsResults, host)
}
for j := 1; j <= REQUESTS_LIMIT; j++ {
dnsRequests <- j
}
close(dnsRequests)
requestsDone := 1
for a := 1; a <= REQUESTS_LIMIT; a++ {
select {
case latencyLocal := <-results:
latency = latency + latencyLocal
requestsDone = requestsDone + 1
case errorMsg := <-errorsResults:
return 0, errors.New(errorMsg)
case <-time.After(time.Second * DURATION_SECONDS):
return latency / int64(requestsDone), nil
}
}
return latency / int64(requestsDone), nil
}
func dnsTest(jobs <-chan int, results chan<- int64, errResults chan<- string, host string) {
for range jobs {
start := time.Now()
if _, err := net.LookupHost(host); err != nil {
errResults <- err.Error()
}
results <- time.Since(start).Nanoseconds() / int64(time.Millisecond)
}
}
func AverageLatency(host string) (latency int64, err error) {
REQUESTS_LIMIT := 100
results := make(chan int64, REQUESTS_LIMIT)
errorsResults := make(chan string, REQUESTS_LIMIT)
for w := 1; w <= REQUESTS_LIMIT; w++ {
go func() {
start := time.Now()
if _, err := net.LookupHost(host); err != nil {
errorResults <- err.Error()
return
}
results <- time.Since(start).Nanoseconds() / int64(time.Millisecond)
}
}
requestsDone := 1
for a := 1; a <= REQUESTS_LIMIT; a++ {
select {
case latencyLocal := <-results:
latency = latency + latencyLocal
requestsDone = requestsDone + 1
case errorMsg := <-errorsResults:
return 0, errors.New(errorMsg)
case <-time.After(time.Second * DURATION_SECONDS):
return latency / int64(requestsDone), nil
}
}
return latency / int64(requestsDone), nil
}
func AverageLatency(host string) (latency int64, err error) {
REQUESTS_LIMIT := 100
results := make(chan int64, REQUESTS_LIMIT)
errorsResults := make(chan string, REQUESTS_LIMIT)
var wg sync.WaitGroup
wg.Add(REQUESTS_LIMIT)
for j := 0; j < REQUESTS_LIMIT; j++ {
go func() {
defer wg.Done()
start := time.Now()
if _, err := net.LookupHost(host); err != nil {
errorResults <- err.Error()
return
}
results <- time.Since(start).Nanoseconds() / int64(time.Millisecond)
}
}
wg.Wait()
...
}
type Metrics struct {
AverageLatency float64
RequestCount int64
ErrorCount int64
}
func AverageLatency(host string) Metrics {
REQUESTS_LIMIT := 100
var errors int64
results := make([]int64, 0, DEFAULT_REQUESTS_LIMIT)
var wg sync.WaitGroup
wg.Add(REQUESTS_LIMIT)
for j := 0; j < REQUESTS_LIMIT; j++ {
go func() {
defer wg.Done()
start := time.Now()
if _, err := net.LookupHost(host); err != nil {
fmt.Printf("%s", err.Error())
atomic.AddInt64(&errors, 1)
return
}
append(results, time.Since(start).Nanoseconds() / int64(time.Millisecond))
}
}
wg.Wait()
return CalculateStats(&results, &errors)
}
// Takes amount of requests and errors and returns some stats on a
// `Metrics` struct
func CalculateStats(results *[]int64, errors *int64) Metrics {
successfulRequests := len(*results)
errorCount := atomic.LoadInt64(errors)
// Sum up all the latencies
var totalLatency int64 = 0
for _, value := range *results {
totalLatency += value
}
avgLatency := float64(-1)
if successfulRequests > 0 {
avgLatency = float64(totalLatency) / float64(successfulRequests)
}
return Metrics{
avgLatency,
int64(successfulRequests),
errorCount
}
}
func waitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false
case <-time.After(timeout):
return true
}
}
func AverageLatency(host string) Metrics {
REQUESTS_LIMIT := 100
var errors int64
results := make([]int64, 0, REQUESTS_LIMIT)
var wg sync.WaitGroup
wg.Add(REQUESTS_LIMIT)
for j := 0; j < REQUESTS_LIMIT; j++ {
go func() {
defer wg.Done()
start := time.Now()
if _, err := net.LookupHost(host); err != nil {
fmt.Printf("%s", err.Error())
atomic.AddInt64(&errors, 1)
return
}
append(results, time.Since(start).Nanoseconds() / int64(time.Millisecond))
}
}
if waitWithTimeout(&wg, time.Duration(time.Second*DURATION_SECONDS)) {
fmt.Println("There was a timeout waiting for DNS requests to finish")
}
return CalculateStats(&results, &errors)
}
func AverageLatency(host string) Metrics {
var errors int64
results := make([]int64, 0, REQUESTS_LIMIT)
successfulRequestsQueue := make(chan int64, 1)
var wg sync.WaitGroup
wg.Add(DEFAULT_REQUESTS_LIMIT)
for j := 0; j < REQUESTS_LIMIT; j++ {
go func() {
start := time.Now()
if _, err := net.LookupHost(host); err != nil {
atomic.AddInt64(&errors, 1)
wg.Done()
return
}
successfulRequestsQueue <- time.Since(start).Nanoseconds() / 1e6
}()
}
go func() {
for t := range successfulRequestsQueue {
results = append(results, t)
wg.Done()
}
}()
if waitTimeout(&wg, time.Duration(time.Second*DURATION_SECONDS)) {
fmt.Println("There was a timeout waiting for DNS requests to finish")
}
return CalculateDNSReport(&results, &errors)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment