Skip to content

Instantly share code, notes, and snippets.

@samdmarshall
Created March 8, 2021 18:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samdmarshall/f41ce155423776a2dfb44204d50958f4 to your computer and use it in GitHub Desktop.
Save samdmarshall/f41ce155423776a2dfb44204d50958f4 to your computer and use it in GitHub Desktop.
# =======
# Imports
# =======
import json
import strformat
import asyncdispatch
import threadproxy
# =====
# Types
# =====
type
Queue = object
name: string
token: ThreadToken
thread: Thread[QueueTask]
QueueTask = object
token: ThreadToken
QueueManager = object
main: MainThreadProxy
QueueMain = proc (task: QueueTask) {.thread, nimcall.}
# =========
# Functions
# =========
proc initQueueManager(name: string = "main"): QueueManager =
let proxy = newMainThreadProxy(name)
asyncCheck proxy.poll()
proxy.onData "new":
echo "action: new"
return newJNull()
proxy.onData "stop":
proxy.stop()
result = QueueManager(main: proxy)
proc createQueue(ctx: QueueManager, name: string, main: QueueMain): Queue =
let proxy = ctx.main
echo "creating queue!"
let token = proxy.createToken(name)
var worker: Thread[QueueTask]
createThread(worker, main, QueueTask(token: token))
let init_future = waitFor proxy.ask(name, "init", newJNull())
result = Queue(name: name, token: token, thread: worker)
proc dispatchAsync(ctx: QueueManager, queue: Queue, action: string, data: JsonNode): Future[JsonNode] =
let proxy = ctx.main
echo fmt"pushing ({action}) with '{data}' to thread: {queue.name}"
result = proxy.ask(queue.name, action, data)
# =======
# Threads
# =======
proc workerMain(task: QueueTask) {.thread.} =
let proxy = newThreadProxy(task.token)
# register action handler
proxy.onData "multiply":
let x = data["value"].getInt()
echo fmt"start processing task: {x}"
return %*{"value": x + 20}
# start processing channel
waitFor proxy.poll()
proc response(f: Future[JsonNode]) =
echo "future callback!"
let data = f.read()
let value = data["value"].getInt()
echo fmt"{value - 20} -> {value}"
echo fmt"finish processing task: {value - 20}"
# ===========
# Entry Point
# ===========
proc main() =
var ctx = initQueueManager()
var queue = ctx.createQueue("thread_1", workerMain)
let index = 8
# for index in 0..10:
let data = %*{"value": index}
echo "dispatching task: " & $data
let answer = ctx.dispatchAsync(queue, "multiply", data)
answer.addCallback response
echo "done dispatching"
runForever()
when isMainModule:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment