Skip to content

Instantly share code, notes, and snippets.

@alehander92
Created March 26, 2020 15:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alehander92/f456c84c745b59c6e8a0b1e267840376 to your computer and use it in GitHub Desktop.
Save alehander92/f456c84c745b59c6e8a0b1e267840376 to your computer and use it in GitHub Desktop.
import deques
type
AsyncQueue[Input, Output] = ref object
list: Deque[Input]
futures: Deque[Future[Output]]
running: bool
handler: proc(input: Input): Future[Output]
proc initAsyncQueue[Input, Output](handler: proc(input: Input): Future[Output]): AsyncQueue[Input, Output] =
AsyncQueue[Input, Output](list: initDeque[Input](), futures: initDeque[Future[Output]](), running: false, handler: handler)
proc run[Input, Output](queue: AsyncQueue[Input, Output]): Future[void] {.async.} =
queue.running = true
while queue.list.len > 0:
var next = queue.list.popLast
var future = queue.futures.popLast
# echo "next ", $next
future.complete(await queue.handler(next))
queue.running = false
proc pushOrSend[Input, Output](queue: AsyncQueue[Input, Output], input: Input): Future[Output] {.async.} =
var future = newFuture[Output]("pushOrSend " & $input)
# echo "pushOrSend " & $input
queue.list.addFirst(input)
queue.futures.addFirst(future)
if not queue.running:
await queue.run()
return await future
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment