Skip to content

Instantly share code, notes, and snippets.

@valsteen
Last active March 20, 2023 02:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save valsteen/38e82d7ee5fc5d03822464948f0e46b3 to your computer and use it in GitHub Desktop.
Save valsteen/38e82d7ee5fc5d03822464948f0e46b3 to your computer and use it in GitHub Desktop.
Self-feeding processing queue
/*
This demonstrates a task scheduling loop where tasks can send back other tasks to the queue.
Program input: a nested list of items that can either be integers, or nested slices:
- when it's an integer, wait for 25 - value seconds
- when it's a slice, wait for len(slice) seconds, then put back each item in the queue to be processed next
waiting time simulates some processing, on which a concurrency limit is applied ( both waiting times share the same
limit ). pushing back to the queue should not be blocking.
The following tree is used to validate the solution:
[
1,
2,
[3, [4, 5, [6, 7]], 8],
[9, 10],
11,
12,
13,
[14, 15, [16, [17, 18, 19, [20, 21, 22]]]]
]
Expected timings below, that I consistently obtain on 3 different implementations
|-----------------|-----------------------------------|
| max concurrency | approximate total time in seconds |
|-----------------|-----------------------------------|
| 10 | 38 * |
| 5 | 65 |
| 1 | 319 |
| unlimited | 27 |
|-----------------|-----------------------------------|
* not always stable, it can also give 42 ( yes, 42 ). This is probably because new items are added to the queue
from two goroutines at the same moment ; the items may interleave in different ways.
Rust version: https://gist.github.com/valsteen/103aac191afa881d88829bb9e3699784
Python version: https://gist.github.com/valsteen/6989796b49be4dc102fed2fb08c05cf3
*/
package main
import (
"fmt"
"sync"
"time"
)
var taskTree = []any{
1,
2,
[]any{3, []any{4, 5, []any{6, 7}}, 8},
[]any{9, 10},
11,
12,
13,
[]any{14, 15, []any{16, []any{17, 18, 19, []any{20, 21, 22}}}},
}
// MaxConcurrency 0 = unlimited
const MaxConcurrency = 10
func process(value any) any {
switch actual := value.(type) {
case int:
fmt.Printf(">> processing final value %d ...\n", actual)
time.Sleep(time.Duration(25-actual) * time.Second)
fmt.Printf("<< finished processing final value %d ...\n", actual)
return nil
case []any:
fmt.Printf("@@>> got list of %d, rescheduling them\n", len(actual))
time.Sleep(time.Duration(len(actual)) * time.Second)
fmt.Printf("@@<< finished processing list of %d\n", len(actual))
return actual
default:
panic("invalid input")
}
}
func work(input chan any, value any, tasks *sync.WaitGroup) {
result := process(value)
if values, ok := result.([]any); ok {
tasks.Add(len(values))
for _, newValue := range values {
input <- newValue
}
}
tasks.Done()
}
func worker(input chan any, tasks *sync.WaitGroup) {
for value := range input {
work(input, value, tasks)
}
}
func unlimitedWorker(input chan any, tasks *sync.WaitGroup) {
for value := range input {
go work(input, value, tasks)
}
}
func main() {
input := make(chan any, 100) // no unbounded channel in go.
start := time.Now()
tasks := sync.WaitGroup{}
if MaxConcurrency > 0 {
for i := 0; i < MaxConcurrency; i++ {
go worker(input, &tasks)
}
} else {
go unlimitedWorker(input, &tasks)
}
tasks.Add(len(taskTree))
for _, value := range taskTree {
input <- value
}
tasks.Wait()
close(input)
elapsed := time.Now().Sub(start)
fmt.Printf("%0.2f", elapsed.Seconds())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment