Last active
February 28, 2020 21:09
-
-
Save kevinkk525/5b89395604da3df3be7a015abcba04fa to your computer and use it in GitHub Desktop.
Test if the new uasyncio heap implementation is ISR proof
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import ticks_diff, ticks_ms as ticks | |
import uasyncio as asyncio | |
from uasyncio.core import _queue | |
async def ph_meld(h1, h2): | |
if h1 is None: | |
# print("h1 none") | |
return h2 | |
if h2 is None: | |
# print("h2 none") | |
return h1 | |
lt = ticks_diff(h1.ph_key, h2.ph_key) < 0 | |
if lt: | |
# print("lt") | |
if h1.ph_child is None: | |
h1.ph_child = h2 | |
else: | |
h1.ph_child_last.ph_next = h2 | |
h1.ph_child_last = h2 | |
h2.ph_next = None | |
yield | |
h2.ph_rightmost_parent = h1 | |
# print("lt ret") | |
return h1 | |
else: | |
# print("non lt") | |
h1.ph_next = h2.ph_child | |
yield | |
h2.ph_child = h1 | |
if h1.ph_next is None: | |
h2.ph_child_last = h1 | |
h1.ph_rightmost_parent = h2 | |
# print("non lt ret") | |
return h2 | |
asyncio.ph_meld = ph_meld | |
async def eliza(i): | |
print(i) | |
def push_sorted(heap, v, data): | |
v.ph_key = data | |
v.ph_child = None | |
v.ph_next = None | |
new_heap = None | |
try: | |
new_heap = yield from ph_meld(v, heap.heap) | |
except StopIteration as e: | |
pass | |
# print("heap", new_heap.name, new_heap.ph_child) | |
heap.heap = new_heap | |
def printHeap(q): | |
i = q.heap | |
j = 0 | |
while i: | |
print(j, i.name) | |
j += 1 | |
if i.ph_child is None: | |
i = i.ph_next | |
else: | |
i = i.ph_child | |
def test_no_isr(): | |
print("Testing usual behaviour without ISR (for reference)") | |
tasks = [] | |
events = [] | |
for i in range(5): | |
tasks.append(asyncio.Task(eliza("Task " + str(i)))) | |
tasks[-1].name = "Task " + str(i) | |
tasks[-1].data = None | |
events.append(asyncio.Task(eliza("Event " + str(i)))) | |
events[-1].name = "Event " + str(i) | |
events[-1].data = None | |
for t in tasks: | |
gen = push_sorted(_queue, t, ticks()) | |
try: | |
while True: | |
next(gen) | |
except StopIteration: | |
pass | |
for t in events: | |
gen = push_sorted(_queue, t, ticks()) | |
try: | |
while True: | |
next(gen) | |
except StopIteration: | |
pass | |
printHeap(_queue) | |
asyncio.get_event_loop().run_forever() | |
print("-----------------------------------------") | |
def test_isr(): | |
print("Testing ISR while adding the 3rd Task") | |
tasks = [] | |
events = [] | |
for i in range(5): | |
tasks.append(asyncio.Task(eliza("Task " + str(i)))) | |
tasks[-1].name = "Task " + str(i) | |
tasks[-1].data = None | |
events.append(asyncio.Task(eliza("Event " + str(i)))) | |
events[-1].name = "Event " + str(i) | |
events[-1].data = None | |
for i, t in enumerate(tasks): | |
gen = push_sorted(_queue, t, ticks()) | |
if i != 3: | |
try: | |
while True: | |
next(gen) | |
except StopIteration: | |
pass | |
if i == 3: | |
next(gen) # let task get sorted into queue but only until yield in line 29 | |
# push all Tasks awaiting the Event onto the queue | |
for tt in events: | |
gen2 = push_sorted(_queue, tt, ticks()) | |
try: | |
while True: | |
next(gen2) | |
except StopIteration: | |
pass | |
# finish putting original task onto the queue | |
try: | |
while True: | |
next(gen) | |
except StopIteration: | |
pass | |
printHeap(_queue) | |
asyncio.get_event_loop().run_forever() | |
print("-----------------------------------------") | |
test_no_isr() | |
test_isr() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment