Skip to content

Instantly share code, notes, and snippets.

@arnetheduck
Last active July 4, 2023 11:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arnetheduck/b6a7ac8f4b85490d26d464674e09d57d to your computer and use it in GitHub Desktop.
Save arnetheduck/b6a7ac8f4b85490d26d464674e09d57d to your computer and use it in GitHub Desktop.
import chronos, chronos/threadsync
import stew/ptrops
import std/locks
import std/typetraits
const Numbers = 1000
const Threads = 100
type
Node = object
next: ptr Node
data: UncheckedArray[byte]
LockingList = object
head, tail: ptr Node
lock: Lock
LockingChannel[T] = object
list: LockingList
sig: ThreadSignalPtr
total: int
template withLock(t, x: untyped) =
acquire(t.lock)
x
release(t.lock)
proc addNode(c: var LockingList, node: ptr Node): bool =
withLock(c.lock):
if c.head == nil:
c.head = node
c.tail = node
result = true
else:
c.head.next = node
c.head = node
result = false
proc popNode(c: var LockingList): tuple[node: ptr Node, last: bool] =
withLock(c.lock):
assert c.tail != nil
result[0] = c.tail
c.tail = c.tail.next
if c.tail == nil:
c.head = nil
result[1] = c.tail == nil
proc addTrivial[T](c: var LockingList, v: T): bool =
static: doAssert supportsCopyMem(T)
let
node = cast[ptr Node](allocShared0(sizeof(Node) + sizeof(T)))
copyMem(addr node.data[0], unsafeAddr v, sizeof(T))
c.addNode(node)
proc popTrivial(c: var LockingList, T: type): tuple[data: T, last: bool] {.noinit.} =
let (node, last) = c.popNode()
copyMem(addr result[0], addr node[].data[0], sizeof(T))
freeShared(node)
result[1] = last
proc addBytes(c: var LockingList, v: openArray[byte]): bool =
let
len = v.len
node = cast[ptr Node](allocShared0(sizeof(Node) + sizeof(int) + len))
copyMem(addr node.data[0], unsafeAddr len, sizeof(len))
copyMem(addr node.data[sizeof(int)], baseAddr v, v.len)
c.addNode(node)
proc popBytes(c: var LockingList): tuple[data: seq[byte], last: bool] =
let (node, last) = c.popNode()
var len: int
copyMem(addr len, addr node[].data[0], sizeof(int))
result[0].setLen(len)
if len > 0:
copyMem(addr result[0][0], addr node[].data[0], len)
freeShared(node)
result[1] = last
proc add(c: var LockingChannel[seq[byte]], v: openArray[byte]) =
if c.list.addBytes(v):
discard c.sig.fireSync().expect("working send")
proc pop(c: ptr LockingChannel[seq[byte]]): Future[seq[byte]] {.async.} =
await threadsync.wait(c.sig)
let (data, last) = c.list.popBytes()
if not last:
discard c.sig.fire()
data
proc add[T](c: var LockingChannel[T], v: T) =
if c.list.addTrivial(v):
discard c.sig.fireSync().expect("working send")
proc pop[T](c: ptr LockingChannel[T]): Future[T] {.async.} =
await threadsync.wait(c.sig)
let (data, last) = c.list.popTrivial(T)
if not last:
await c.sig.fire()
data
proc consumer(c: ptr LockingChannel[int]) {.async.} =
for i in 0..<Numbers * Threads:
c[].total += await c.pop()
proc consumerThread(c: ptr LockingChannel[int]) {.thread.} =
waitFor consumer(c)
import os
proc producerThread(c: ptr LockingChannel[int]) {.thread.} =
for i in 0..<Numbers div 2:
c[].add(i)
sleep(1000)
for i in Numbers div 2..<Numbers:
c[].add(i)
var producers, consumers: array[Threads, Thread[ptr LockingChannel[int]]]
var channel: LockingChannel[int]
channel.list.lock.initLock()
channel.sig = ThreadSignalPtr.new()[]
for i in 0..<Threads:
createThread(producers[i], producerThread, addr channel)
createThread(consumers[0], consumerThread, addr channel)
joinThreads(producers)
joinThreads(consumers.toOpenArray(0, 0))
doAssert channel.total == ((Numbers * (Numbers-1)) div 2) * Threads
import chronos, taskpools, taskpools/channels_spsc_single, chronos/threadsync
import os
type
ThreadSignalTask[T] = object
chan: ChannelSPSCSingle[T]
sig: ThreadSignalPtr
proc someTask(task: ptr ThreadSignalTask[int], v: int) =
echo "starting sleep ", v
sleep(v)
discard task.chan.trySend(v)
discard task.sig.fireSync()
proc consumer(task: ptr ThreadSignalTask[int]): Future[int] {.async.} =
await task[].sig.wait()
discard task.chan.tryRecv(result)
task[].sig.close()
deallocShared(task)
proc main() =
let tp = Taskpool.new()
var v: seq[Future[int]]
for i in 0..10:
let task = ThreadSignalTask[int].createShared(1)
task[].sig = ThreadSignalPtr.new().expect("can create")
tp.spawn someTask(task, i * 10)
v.add consumer(task)
while v.len > 0:
let taskFut = v.pop()
echo waitFor(taskFut)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment