-
-
Save cheatfate/fa97356b397df46730ed6e20458f903c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# | |
# Nim's Runtime Library | |
# (c) Copyright 2016 Nim Developers | |
# | |
# See the file "copying.txt", included in this | |
# distribution, for details about the copyright. | |
# | |
## Implementation of a `queue`:idx:. The underlying implementation uses a single | |
## linked list and array of `elementsPerBucket` elements. | |
## In multithreaded environment uses two-lock concurrent queue algorithm | |
## described in the | |
## `article <http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf>`_. | |
from math import isPowerOfTwo | |
const MultiThreaded = compileOption("threads") | |
when MultiThreaded: | |
import locks | |
const arrayLimit = 100_000_000 | |
type | |
ListNodeObj[T] = object | |
next: ListNode[T] | |
d: array[arrayLimit, T] | |
ListNode[T] = ptr ListNodeObj[T] | |
QueueObj[T] = object | |
head, tail: ListNode[T] | |
rnode, wnode: ListNode[T] | |
rindex, windex, count, length, mask: int | |
rlock, wlock: Lock | |
Queue*[T] = ptr QueueObj[T] | |
template withQueueLock(lock: Lock, body: untyped): untyped = | |
acquire lock | |
try: | |
body | |
finally: | |
release lock | |
else: | |
const arrayLimit = 64 | |
type | |
ListNodeObj[T] = object | |
next: ListNode[T] | |
d: array[arrayLimit, T] | |
ListNode[T] = ref ListNodeObj[T] | |
QueueObj[T] = object | |
head, tail: ListNode[T] | |
rnode, wnode: ListNode[T] | |
rindex, windex, count, length, mask: int | |
Queue*[T] = ref QueueObj[T] | |
template withQueueLock(lock: untyped, body: untyped): untyped = | |
body | |
when defined(nimdoc): | |
type | |
Queue*[T] = ptr QueueObj[T] | |
{.deprecated: [TQueue: Queue].} | |
when MultiThreaded: | |
template newListNode(node: untyped, count: int): untyped = | |
node = cast[ListNode[T]](allocShared0(sizeof(ListNode[T]) + | |
sizeof(T) * count)) | |
template freeListNode[T](node: ListNode[T]) = | |
deallocShared(cast[pointer](node)) | |
else: | |
template newListNode[T](node: ListNode[T], count: int) = | |
new(node) | |
proc setupList[T](q: Queue[T], count: int) = | |
var node, pnode: ListNode[T] = nil | |
var sum = 0 | |
if count <= arrayLimit: | |
new(node) | |
q.head = node | |
new(node) | |
q.head.next = node | |
q.tail = node | |
else: | |
while sum < count: | |
new(node) | |
if pnode == nil: | |
pnode = node | |
q.head = node | |
else: | |
pnode.next = node | |
pnode = pnode.next | |
sum = sum + arrayLimit | |
q.tail = pnode | |
proc len*[T](q: Queue[T]): int = | |
## returns the number of elements of `q`. | |
result = q.length | |
proc initQueue*[T](initialCount: int = 64): Queue[T] = | |
## creates a new queue that is empty. | |
## `initialCount` needs to be a power of 2. | |
assert isPowerOfTwo(initialCount) | |
when MultiThreaded: | |
result = cast[Queue[T]](allocShared0(sizeof(QueueObj[T]))) | |
newListNode(result.head, initialCount) | |
newListNode(result.tail, initialCount) | |
result.rnode = result.head | |
result.wnode = result.head | |
result.head.next = result.tail | |
result.count = initialCount | |
result.mask = initialCount - 1 | |
initLock result.rLock | |
initLock result.wLock | |
else: | |
new(result) | |
setupList(result, initialCount) | |
result.rnode = result.head | |
result.wnode = result.head | |
result.count = arrayLimit | |
result.mask = arrayLimit - 1 | |
proc enqueue*[T](q: Queue[T], item: T) = | |
## adds an ``item`` to the end of the queue ``q``. | |
withQueueLock(q.wLock) do: | |
var node: ListNode[T] = nil | |
q.wnode.d[q.windex] = item | |
q.windex = (q.windex + 1) and q.mask | |
if q.windex == 0: | |
if q.wnode.next == nil: | |
newListNode(node, q.count) | |
q.wnode.next = node | |
q.tail = node | |
q.wnode = q.wnode.next | |
inc(q.length) | |
proc dequeue*[T](q: Queue[T]): T = | |
## removes and returns the first element of the queue `q`. | |
## if queue is empty ``Assert`` exception is raised. | |
withQueueLock(q.rLock) do: | |
var node: ListNode[T] = nil | |
assert q.length > 0 | |
result = q.rnode.d[q.rindex] | |
q.rindex = (q.rindex + 1) and q.mask | |
if q.rindex == 0: | |
node = q.head | |
withQueueLock(q.wLock) do: | |
q.tail.next = node | |
q.tail = node | |
q.head = q.head.next | |
q.rnode = q.rnode.next | |
node.next = nil | |
dec(q.length) | |
proc get*[T](q: Queue[T], item: var T): bool = | |
## removes and set first element of the queue `q` into `item`. | |
## returns ``true`` if element was successfuly set, and ``false`` otherwise. | |
withQueueLock(q.rLock) do: | |
var node: ListNode[T] = nil | |
if q.rnode == q.wnode and q.rindex == q.windex: | |
result = false | |
else: | |
item = q.rnode.d[q.rindex] | |
result = true | |
q.rindex = (q.rindex + 1) and q.mask | |
if q.rindex == 0: | |
node = q.head | |
withQueueLock(q.wLock) do: | |
q.tail.next = node | |
q.tail = node | |
q.head = q.head.next | |
q.rnode = q.rnode.next | |
node.next = nil | |
dec(q.length) | |
proc pop*[T](q: Queue[T]): T = | |
## removes and returns the first element of the queue `q`. | |
## if queue is empty ``ValueError`` exception is raised. | |
if not get(q, result): | |
raise newException(ValueError, "Queue is empty") | |
proc add*[T](q: Queue[T], item: T) = | |
## alias for the ``enqueue`` operation. | |
enqueue(q, item) | |
proc push*[T](q: Queue[T], item: T) = | |
## alias for the ``enqueue`` operation. | |
enqueue(q, item) | |
iterator items*[T](q: Queue[T]): T = | |
## yields every element of ``q``. | |
withQueueLock(q.rLock) do: | |
var s = q.rnode | |
while s != nil: | |
var c = q.rindex | |
if s == q.wnode: | |
while c < q.windex: | |
yield s.d[c] | |
inc(c) | |
break | |
else: | |
while c < q.count: | |
yield s.d[c] | |
inc(c) | |
s = s.next | |
iterator mitems*[T](q: Queue[T]): var T = | |
## yields every element of ``q``. | |
withQueueLock(q.rLock) do: | |
var s = q.rnode | |
while s != nil: | |
var c = q.rindex | |
if s == q.wnode: | |
while c < q.windex: | |
yield s.d[c] | |
inc(c) | |
break | |
else: | |
while c < q.count: | |
yield s.d[c] | |
inc(c) | |
s = s.next | |
proc `$`*[T](q: Queue[T]): string = | |
## turns a queue into its string representation. | |
result = "[" | |
for x in items(q): | |
if result.len > 1: result.add(", ") | |
result.add($x) | |
result.add("]") | |
when MultiThreaded: | |
proc deinitQueue*[T](q: Queue[T]) = | |
## frees memory allocated by queue. | |
acquire q.rLock | |
acquire q.wLock | |
try: | |
var s = q.head | |
while s != nil: | |
var t = s | |
s = s.next | |
freeListNode(t) | |
finally: | |
release q.rLock | |
release q.wLock | |
deinitLock q.rLock | |
deinitLock q.wLock | |
deallocShared(cast[pointer](q)) | |
when isMainModule: | |
var nq = initQueue[int]() | |
nq.enqueue(1) | |
nq.enqueue(2) | |
nq.enqueue(3) | |
nq.enqueue(4) | |
nq.enqueue(5) | |
assert($nq == "[1, 2, 3, 4, 5]") | |
var r = false | |
var a = nq.pop() | |
a = nq.pop() | |
a = nq.pop() | |
a = nq.pop() | |
a = nq.pop() | |
try: | |
a = nq.pop() | |
except ValueError: | |
r = true | |
var oq = initQueue[int]() | |
oq.add(123) | |
oq.add(9) | |
oq.add(4) | |
var first = oq.dequeue | |
oq.add(56) | |
oq.add(6) | |
var second = oq.dequeue | |
oq.add(789) | |
assert first == 123 | |
assert second == 9 | |
assert($oq == "[4, 56, 6, 789]") | |
var cq = initQueue[int]() | |
var i = 0 | |
while i < 50_000_000: | |
cq.enqueue(i) | |
assert i == cq.dequeue() | |
inc(i) | |
i = 0 | |
while i < 50_000_000: | |
cq.enqueue(i) | |
inc(i) | |
i = 0 | |
while i < 50_000_000: | |
assert i == cq.dequeue() | |
inc(i) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This code causes:
Unhandled exception at 0x000000013F67F456 in queue.exe: 0xC00000FD: Stack overflow (parameters: 0x0000000000000001, 0x0000000000043FC8).