Skip to content

Instantly share code, notes, and snippets.

@kevinkk525
Last active February 28, 2020 21:09
Show Gist options
  • Save kevinkk525/5b89395604da3df3be7a015abcba04fa to your computer and use it in GitHub Desktop.
Save kevinkk525/5b89395604da3df3be7a015abcba04fa to your computer and use it in GitHub Desktop.
Test if the new uasyncio heap implementation is ISR proof
from time import ticks_diff, ticks_ms as ticks
import uasyncio as asyncio
from uasyncio.core import _queue
async def ph_meld(h1, h2):
if h1 is None:
# print("h1 none")
return h2
if h2 is None:
# print("h2 none")
return h1
lt = ticks_diff(h1.ph_key, h2.ph_key) < 0
if lt:
# print("lt")
if h1.ph_child is None:
h1.ph_child = h2
else:
h1.ph_child_last.ph_next = h2
h1.ph_child_last = h2
h2.ph_next = None
yield
h2.ph_rightmost_parent = h1
# print("lt ret")
return h1
else:
# print("non lt")
h1.ph_next = h2.ph_child
yield
h2.ph_child = h1
if h1.ph_next is None:
h2.ph_child_last = h1
h1.ph_rightmost_parent = h2
# print("non lt ret")
return h2
asyncio.ph_meld = ph_meld
async def eliza(i):
print(i)
def push_sorted(heap, v, data):
v.ph_key = data
v.ph_child = None
v.ph_next = None
new_heap = None
try:
new_heap = yield from ph_meld(v, heap.heap)
except StopIteration as e:
pass
# print("heap", new_heap.name, new_heap.ph_child)
heap.heap = new_heap
def printHeap(q):
i = q.heap
j = 0
while i:
print(j, i.name)
j += 1
if i.ph_child is None:
i = i.ph_next
else:
i = i.ph_child
def test_no_isr():
print("Testing usual behaviour without ISR (for reference)")
tasks = []
events = []
for i in range(5):
tasks.append(asyncio.Task(eliza("Task " + str(i))))
tasks[-1].name = "Task " + str(i)
tasks[-1].data = None
events.append(asyncio.Task(eliza("Event " + str(i))))
events[-1].name = "Event " + str(i)
events[-1].data = None
for t in tasks:
gen = push_sorted(_queue, t, ticks())
try:
while True:
next(gen)
except StopIteration:
pass
for t in events:
gen = push_sorted(_queue, t, ticks())
try:
while True:
next(gen)
except StopIteration:
pass
printHeap(_queue)
asyncio.get_event_loop().run_forever()
print("-----------------------------------------")
def test_isr():
print("Testing ISR while adding the 3rd Task")
tasks = []
events = []
for i in range(5):
tasks.append(asyncio.Task(eliza("Task " + str(i))))
tasks[-1].name = "Task " + str(i)
tasks[-1].data = None
events.append(asyncio.Task(eliza("Event " + str(i))))
events[-1].name = "Event " + str(i)
events[-1].data = None
for i, t in enumerate(tasks):
gen = push_sorted(_queue, t, ticks())
if i != 3:
try:
while True:
next(gen)
except StopIteration:
pass
if i == 3:
next(gen) # let task get sorted into queue but only until yield in line 29
# push all Tasks awaiting the Event onto the queue
for tt in events:
gen2 = push_sorted(_queue, tt, ticks())
try:
while True:
next(gen2)
except StopIteration:
pass
# finish putting original task onto the queue
try:
while True:
next(gen)
except StopIteration:
pass
printHeap(_queue)
asyncio.get_event_loop().run_forever()
print("-----------------------------------------")
test_no_isr()
test_isr()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment