Skip to content

Instantly share code, notes, and snippets.

@samdmarshall
Created March 7, 2021 22:46
Show Gist options
  • Save samdmarshall/d8fbe05e39ec385d70aa09a1683692ae to your computer and use it in GitHub Desktop.
Save samdmarshall/d8fbe05e39ec385d70aa09a1683692ae to your computer and use it in GitHub Desktop.
# =======
# Imports
# =======
import json
import strformat
import asyncdispatch
import threadproxy
# =====
# Types
# =====
type
Queue = object
name: string
token: ThreadToken
thread: Thread[QueueTask]
QueueTask = object
token: ThreadToken
QueueManager = object
main: MainThreadProxy
QueueMain = proc (task: QueueTask) {.thread, nimcall.}
# =========
# Functions
# =========
# =======
# Threads
# =======
proc workerMain(task: QueueTask) {.thread.} =
let proxy = newThreadProxy(task.token)
# register action handler
proxy.onData "multiply":
echo "processing task!"
let x = data["value"].getInt()
return %*{"value": x + 20}
# start processing channel
waitFor proxy.poll()
proc response(f: Future[JsonNode]) =
echo "future callback!"
let data = f.read()
let value = data["value"].getInt()
echo fmt"{value - 20} -> {value}"
# ===========
# Entry Point
# ===========
proc main() =
let proxy = newMainThreadProxy("main")
asyncCheck proxy.poll()
proxy.onData "stop":
proxy.stop()
let token = proxy.createToken("thread_1")
var worker: Thread[QueueTask]
createThread(worker, workerMain, QueueTask(token: token))
let index = 8
# for index in 0..10:
let data = %*{"value": index}
echo "dispatching task: " & $data
let answer = waitFor proxy.ask("thread_1", "multiply", data)
echo answer
echo "done dispatching"
runForever()
when isMainModule:
main()
# =======
# Imports
# =======
import json
import strformat
import asyncdispatch
import threadproxy
# =====
# Types
# =====
type
Queue = object
name: string
token: ThreadToken
thread: Thread[QueueTask]
QueueTask = object
token: ThreadToken
QueueManager = object
main: MainThreadProxy
QueueMain = proc (task: QueueTask) {.thread, nimcall.}
# =========
# Functions
# =========
proc initQueueManager(name: string = "main"): QueueManager =
let proxy = newMainThreadProxy(name)
asyncCheck proxy.poll()
proxy.onData "new":
echo "action: new"
return newJNull()
proxy.onData "stop":
proxy.stop()
result = QueueManager(main: proxy)
proc createQueue(ctx: QueueManager, name: string, main: QueueMain): Queue =
let proxy = ctx.main
echo "creating queue!"
let token = proxy.createToken(name)
var worker: Thread[QueueTask]
createThread(worker, main, QueueTask(token: token))
result = Queue(name: name, token: token, thread: worker)
proc dispatchAsync(ctx: QueueManager, queue: Queue, action: string, data: JsonNode): Future[JsonNode] =
let proxy = ctx.main
echo fmt"pushing ({action}) with '{data}' to thread: {queue.name}"
result = proxy.ask(queue.name, action, data)
# =======
# Threads
# =======
proc workerMain(task: QueueTask) {.thread.} =
let proxy = newThreadProxy(task.token)
# register action handler
proxy.onData "multiply":
echo "processing task!"
let x = data["value"].getInt()
return %*{"value": x + 20}
# start processing channel
waitFor proxy.poll()
proc response(f: Future[JsonNode]) =
echo "future callback!"
let data = f.read()
let value = data["value"].getInt()
echo fmt"{value - 20} -> {value}"
# ===========
# Entry Point
# ===========
proc main() =
var ctx = initQueueManager()
var queue = ctx.createQueue("thread_1", workerMain)
let index = 8
# for index in 0..10:
let data = %*{"value": index}
echo "dispatching task: " & $data
let answer = waitFor ctx.dispatchAsync(queue, "multiply", data)
echo answer
echo "done dispatching"
runForever()
when isMainModule:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment