Skip to content

Instantly share code, notes, and snippets.

@pnck
Created February 21, 2023 21:22
Show Gist options
  • Save pnck/bf88ef3a67b6a23858a6f59e740f7ae1 to your computer and use it in GitHub Desktop.
Save pnck/bf88ef3a67b6a23858a6f59e740f7ae1 to your computer and use it in GitHub Desktop.
This gitst shows how complicated writing correct concurrency in go
const maxAlive = 7;
let idAcc = 0;
const idGenerator = () => {
return idAcc++;
}; // can be hashed, accumulator idGenerator is for convenience
let alives = [];
let nonemptyResolver = { resolve: () => {} };
let nonemptyWaiter;
const results = [];
const resetNonemptyWaiter = () => {
if (!!results.length) {
return;
}
nonemptyWaiter = new Promise((resolve) => {
nonemptyResolver = { resolve };
});
};
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
async function spawnWorker(fn, params, followedTask) {
// REQUIREMENT: should spawn worker as long as there is a free slot (alives.length < maxAlive)
// REQUIREMENT: spawner should be blockable if all slots are occupied
const worker = { id: idGenerator() };
while (alives.length >= maxAlive) {
// console.log("==== full,wait... ====");
const ready = await Promise.any(alives.map((v) => v.task));
}
alives.push(worker);
worker.task = new Promise(async (resolve, reject) => {
try {
const holdingResult = await fn(params);
alives = alives.filter((p) => p.id !== worker.id);
const getResult = async () => {
await (
await followedTask
)?.result;
results.push({ workerId: worker.id, result: holdingResult });
nonemptyResolver.resolve();
};
resolve({ workerId: worker.id, result: getResult() });
} catch (err) {
reject(err);
}
});
// REQUIREMENT: alive queue size should never grow beyond maxAlive
console.log(
`task id=${worker.id} queued, alives = ${alives.length}, results = ${results.length}`
);
return worker;
}
async function getNext() {
await nonemptyWaiter;
const result = await results.shift();
resetNonemptyWaiter();
if (result.result === "stub") {
return await results.shift();
}
return result;
}
//// ---- QUEUE TASKS ---- ////
resetNonemptyWaiter();
console.log("- queue first"); // REQUIREMENT: FINFDFO should be the first in result queue
let lastWorker = await spawnWorker(
async (v) => {
await sleep(v[0]);
console.log(`FINFDFO done, T= ${v[0]} v = ${v[1]}`);
return "FirstInNotFirstDoneFirstOut";
},
[6000, 1],
Promise.resolve({ workerId: -1, result: "stub" })
);
console.log("+ queued first");
console.log("- queue 5 longs"); // REQUIREMENT: long term tasks should finish before the first
for (let i = 0; i < 5; i++) {
lastWorker = await spawnWorker(
async (v) => {
await sleep(v[0]);
console.log(`long done, T = ${v[0]}, v = ${v[1]}`);
return `long_${v[1]}`;
},
[5000 + i * 100, i],
lastWorker.task
);
}
console.log("+ queued 5 longs");
console.log("- queue 20 shorts");
// REQUIREMENT: short term tasks done quickly and fully fill the alive queue
// REQUIREMENT: all short term tasks should be queued and done successfully while first task is still unfinished
for (let i = 0; i < 20; i++) {
lastWorker = await spawnWorker(
async (v) => {
await sleep(v[0]);
console.log(`short done, T = ${v[0]}, v = ${v[1]}`);
return `short_${v[1]}`;
},
[100 + i * 2, i],
lastWorker.task
);
}
console.log("+ queued 20 shorts");
const concurrentSet1 = new Promise((resolve) => {
// REQUIREMENT: concurrent sets should be able to be queued while getting results
setTimeout(async () => {
console.log("- queue 50 shorts (concurrent set 1):");
let _lastWorker = lastWorker;
for (let i = 0; i < 50; i++) {
await sleep(Math.random() * 200);
_lastWorker = await spawnWorker(
async (v) => {
await sleep(v[0]);
console.log(`c_short done, T = ${v[0]}, v = ${v[1]}`);
return `c_short_v${v[1]}`;
},
[400 + i * 10, i],
_lastWorker.task
);
}
console.log("+ queued 50 shorts (concurrent set 1):");
resolve(_lastWorker.task);
}, 1);
});
const concurrentSet2 = new Promise((resolve) => {
// REQUIREMENT: concurrent set2 mixes up the finish order with set1
setTimeout(async () => {
console.log("- queue 50 randoms (concurrent set 2):");
let _lastWorker = lastWorker;
for (let i = 0; i < 50; i++) {
await sleep(Math.random() * 200);
_lastWorker = await spawnWorker(
async (v) => {
await sleep(v[0]);
console.log(`c_random done, T = ${v[0]}, v = ${v[1]}`);
return `c_random_v${v[1]}`;
},
[Math.round(Math.random() * 5000), i], // large random range, and it's concurrent with set1!
_lastWorker.task
);
}
console.log("+ queued 50 randoms (concurrent set 2):");
resolve(_lastWorker.task);
}, 1);
});
setTimeout(async () => {
const eachLastTasks = Promise.all([concurrentSet1, concurrentSet2]);
// REQUIREMENT: LINLDLO should be the last result
// REQUIREMENT: LINLDLO should be able to get done before some c_random tasks
console.log("- queue LILDLO (concurrent)");
await spawnWorker(
async () => {
await sleep(100);
console.log(`LINLDLO done`);
return "LastInNotLastDoneLastOut";
},
null,
Promise.all((await eachLastTasks).map((v) => v.result))
);
console.log("+ queued LINLDLO");
}, 1);
try {
while (1) {
// REQUIREMENT: each group of tasks should be strictly in the order of their enqueue time
// REQUIREMENT: getNext() should be able to get result while concurrently adding new tasks
const r = await getNext();
console.log("got:", r);
if (r.result === "LastInNotLastDoneLastOut") {
break;
}
await sleep(10);
}
} catch (err) {
console.log(err);
} finally {
console.log("finished");
}
// node mess_queue.js | grep ...
// cat mess_queue.js| grep REQUIREMENT
// REQUIREMENT: should spawn worker as long as there is a free slot (alives.length < maxAlive)
// REQUIREMENT: spawner should be blockable if all slots are occupied
// REQUIREMENT: alive queue size should never grow beyond maxAlive
// REQUIREMENT: FINFDFO should be the first in result queue
// REQUIREMENT: long term tasks should finish before the first
// REQUIREMENT: short term tasks done quickly and fully fill the alive queue
// REQUIREMENT: all short term tasks should be queued and done successfully while first task is still unfinished
// REQUIREMENT: concurrent sets should be able to be queued while getting results
// REQUIREMENT: concurrent set2 mixes up the finish order with set1
// REQUIREMENT: LINLDLO should be the last result
// REQUIREMENT: LINLDLO should be able to get done before some c_random tasks
// REQUIREMENT: each group of tasks should be strictly in the order of their enqueue time
// REQUIREMENT: getNext() should be able to get result while concurrently adding new tasks
package main
import (
"context"
"fmt"
"log"
"math/rand"
"reflect"
"sync"
"sync/atomic"
"time"
)
const maxAlive = 7
var idAccumulator atomic.Int32
type resultType = string
type asKey string
type ResultQueue[T any] struct {
ch chan *Task[T]
}
func (q *ResultQueue[T]) Push(v *Task[T]) {
q.ch <- v
}
func (q *ResultQueue[T]) Pop() *Task[T] {
return <-q.ch
}
func NewResultQueue[T any]() *ResultQueue[T] {
return &ResultQueue[T]{
ch: make(chan *Task[T]),
}
}
type SizedBucket[T any] interface {
Size() int
Cap() int
Put(v T)
TakeAny() *T
TakeBy(cmp func(T) bool) *T
}
type Task[T any] struct {
Id int
running atomic.Bool
lock sync.Mutex // must lock while setting result
fn func() T
following *Task[T]
finishedCh chan struct{}
result *T
resultCh <-chan T
resultQueue *ResultQueue[T]
}
func genId() int {
return int(idAccumulator.Add(1)) - 1
}
func NewTask[T any](fn func() T) *Task[T] {
newTask := &Task[T]{
Id: genId(),
fn: fn,
finishedCh: make(chan struct{}, 1),
}
newTask.running.Store(false)
return newTask
}
func (t *Task[T]) awaitResult() {
t.lock.Lock()
defer t.lock.Unlock()
if t.result != nil {
return
}
r := <-t.resultCh
t.result = &r
}
func (t *Task[T]) AwaitResult() T {
t.awaitResult()
return *t.result
}
func (t *Task[T]) Follow(task *Task[T]) *Task[T] {
t.following = task
return t
}
func (t *Task[T]) PushTo(q *ResultQueue[T]) *Task[T] {
t.resultQueue = q
return t
}
func (t *Task[T]) Execute() *Task[T] {
if t.fn == nil {
return t
}
if t.running.CompareAndSwap(false, true) {
resultCh := make(chan T, 1)
t.resultCh = resultCh
go func() {
r := t.fn()
t.finishedCh <- struct{}{}
close(t.finishedCh)
if t.following != nil {
t.following.awaitResult()
}
resultCh <- r
close(resultCh)
if t.resultQueue != nil {
t.resultQueue.Push(t)
}
}()
}
return t
}
type Spawner[T any] struct {
maxAlive int
slots []*Task[T]
fd_set []reflect.SelectCase
finishing <-chan struct{}
}
func NewSpawner[T any](ctx context.Context, maxAlive int) *Spawner[T] {
sp := &Spawner[T]{maxAlive: maxAlive,
slots: make([]*Task[T], 0, maxAlive),
fd_set: make([]reflect.SelectCase, 0, maxAlive+1),
finishing: ctx.Done(),
}
for i := 0; i < maxAlive; i++ {
sp.slots = append(sp.slots, nil)
tc := make(chan struct{}, 1)
tc <- struct{}{}
close(tc)
sp.fd_set = append(sp.fd_set, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tc)})
}
sp.fd_set = append(sp.fd_set, reflect.SelectCase{Dir: reflect.SelectDefault})
return sp
}
func (sp *Spawner[T]) Spawn(fn func() T) *Task[T] {
select {
case <-sp.finishing:
log.Println("* finishing, will not spawn new task")
// construct default empty func
return NewTask(*reflect.New(reflect.TypeOf(fn)).Interface().(*func() T))
default:
break
}
chosen, _, _ := reflect.Select(sp.fd_set)
if chosen == len(sp.fd_set)-1 {
// log.Println("==== FULL, wait... ====")
fd_set := sp.fd_set[:len(sp.fd_set)-1]
chosen, _, _ = reflect.Select(fd_set)
}
newTask := NewTask(fn)
sp.slots[chosen] = newTask
sp.fd_set[chosen] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(newTask.finishedCh),
}
// log.Printf("taskId = %d created, alives = %d \n", newTask.Id, len(sp.slots))
return newTask
}
func routineGetResult(ctx context.Context) {
results := ctx.Value(asKey("resultQueue")).(*ResultQueue[resultType])
EndEverything := ctx.Value(asKey("EndEverything")).(context.CancelFunc)
for {
select {
case <-ctx.Done():
return
default:
}
task := results.Pop()
r := task.AwaitResult()
if r == "FINISH" {
log.Println("* receiving routine got FINISH, gracefully exit with calling EndEverything()")
EndEverything()
return
}
log.Printf("got: %v\n", r)
}
}
func routineDoSpawnWorkers(ctx context.Context) {
queued := ctx.Value(asKey("queued")).(chan struct{})
resultQueue := ctx.Value(asKey("resultQueue")).(*ResultQueue[resultType])
spawner := NewSpawner[resultType](ctx, maxAlive)
____OKLetsRunTests____(spawner, resultQueue)
queued <- struct{}{}
<-ctx.Done() // ensure EndEverything() called
// following attempts should fail
for i := 0; i < 5; i++ {
spawner.Spawn(func() resultType {
// should see "* finishing, will not spawn new task"
log.Println("*! Should never see this since EndEverything() has been called")
return "! Should never see this since EndEverything() has been called"
}).Execute()
}
}
func main() {
ctx := context.Background()
ctx, EndEverything := context.WithCancel(ctx)
ctx = context.WithValue(ctx, asKey("EndEverything"), EndEverything)
resultQueue := NewResultQueue[resultType]()
ctx = context.WithValue(ctx, asKey("resultQueue"), resultQueue)
go routineGetResult(ctx)
signalQueued := make(chan struct{})
ctx = context.WithValue(ctx, asKey("queued"), signalQueued)
go routineDoSpawnWorkers(ctx)
go func() {
<-signalQueued
log.Println("* all tests queued, waiting for results")
// EndEverything()
}()
<-ctx.Done()
log.Println("* main routine end waiting")
time.Sleep(time.Millisecond)
}
// ------------------------------------------- //
func ____OKLetsRunTests____(sp *Spawner[resultType], resultQueue *ResultQueue[resultType]) {
log.Println("okLetsRunTests")
log.Println("- queue FINFDFO (FirstInNotFirstDoneFirstOut)")
last := sp.Spawn(func() resultType {
log.Println("RUN FINFDFO")
time.Sleep(time.Second * 6)
log.Printf("FINFDFO done, values = %+v\n", []int{1, 2, 3})
return "FINFDFO"
})
log.Println("+ queued FINFDFO")
last.PushTo(resultQueue).Execute()
log.Println("- queue 5 long terms")
for i := 0; i < 5; i++ {
_last := last
t := time.Second*5 + time.Millisecond*time.Duration(10*i)
v := i
last = sp.Spawn(func() resultType {
time.Sleep(t)
log.Printf("long term task done, T = %v, value = %v\n", t, v)
return fmt.Sprintf("long_v%d", v)
})
last.Follow(_last).PushTo(resultQueue).Execute()
}
log.Println("+ queued 5 long terms")
log.Println("- queue 200 short terms")
for i := 0; i < 200; i++ {
_last := last
t := time.Millisecond*100 + time.Millisecond*time.Duration(rand.Intn(120)-60)
v := i
last = sp.Spawn(func() resultType {
time.Sleep(t)
log.Printf("short term task done, T = %v, value = %v\n", t, v)
return fmt.Sprintf("short_v%d", v)
})
last.Follow(_last).PushTo(resultQueue).Execute()
}
log.Println("+ queued 200 short terms")
log.Println("- queue FINISH task, routines should end gracefully")
final := sp.Spawn(func() resultType {
return "FINISH"
}).Follow(last).PushTo(resultQueue).Execute()
log.Println("+ queued FINISH task")
_ = final.AwaitResult()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment