Created
March 25, 2020 15:11
-
-
Save kevinkk525/3313a8d2763031a705e37d1748d1d996 to your computer and use it in GitHub Desktop.
Testcases for wait_for uasyncio extmod
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Kevin Köck | |
# Copyright Kevin Köck 2019 Released under the MIT license | |
# Created on 2019-11-11 | |
### | |
# This test can be run on micropython and Cpython3. | |
# The results should be the same otherwise there's a bug in the micropython implementation. | |
### | |
import time | |
import sys | |
if sys.platform == "win32" or sys.version_info[1] > 4: | |
# unix port reports sys.platform=="linux" just like CPython linux but version differs | |
mp = False | |
import asyncio | |
else: | |
import uasyncio as asyncio | |
mp = True | |
class MyException(Exception): | |
pass | |
tests_success = [] | |
tests_failed = [] | |
def printSummary(): | |
print("\n") | |
print("################################") | |
print("# Test count:", len(tests_failed) + len(tests_success)) | |
print("#\n# Tests failed:", len(tests_failed)) | |
print("# Tests successful:", len(tests_success)) | |
if tests_failed: | |
print("#\n# Tests failed:") | |
for f in tests_failed: | |
print("#", f) | |
print("################################") | |
def dprint(*args): | |
print(time.time() if not mp else time.ticks_us(), *args) | |
async def awaiting(t, return_if_fail=False, res=None, exc=None): | |
try: | |
dprint("awaiting started") | |
await asyncio.sleep(float(t)) # float(t) will raise Exception if str is passed | |
if res: | |
res[0] = False | |
except asyncio.CancelledError: | |
# CPython wait_for raises CancelledError inside task but TimeoutError in wait_for | |
dprint("awaiting canceled") | |
if res: | |
res[0] = True | |
if exc: | |
exc[0] = asyncio.CancelledError | |
if return_if_fail: | |
return False # return has no effect if Cancelled | |
else: | |
raise | |
except Exception as e: | |
dprint("Caught exception", e) | |
if exc: | |
exc[0] = e | |
raise | |
dprint("awaiting exited cleanly") | |
return "return_value" | |
async def publish(t, return_if_fail=False, res=None): | |
dprint("started publish") | |
result = False | |
try: | |
result = await asyncio.wait_for(awaiting(t, return_if_fail, res), 0.2) | |
return result | |
except asyncio.TimeoutError: | |
dprint("Got timeout error") | |
return False | |
except asyncio.CancelledError: | |
dprint("Publish request got canceled") | |
return False | |
finally: | |
dprint("finally") | |
dprint("result", result) | |
def _test(name, coro, exp): | |
success = True | |
async def test(name, coro, exp): | |
nonlocal success | |
print("") | |
dprint("Test:", name) | |
try: | |
r = await coro | |
if type(r) == type(exp) == list: | |
success = True | |
if len(r) != len(exp): | |
success = False | |
else: | |
for i, res in enumerate(r): | |
if isinstance(res, Exception) and type(res) != exp[i] or \ | |
not isinstance(res, Exception) and exp[i] != res: | |
success = False | |
break | |
else: | |
success = r == exp | |
dprint("Test {!r} result: {!s}, Got: {!r}, expected: {!r}".format(name, success, r, | |
exp)) | |
except Exception as e: | |
dprint( | |
"Test {!r} result: {!s}, Got: {!r}, expected: {!r}".format(name, type(e) == exp, e, | |
exp)) | |
success = type(e) == exp | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(test(name, coro, exp)) | |
if success: | |
tests_success.append(name) | |
else: | |
tests_failed.append(name) | |
async def test_killing_caller(): | |
res = False | |
async def wait(): | |
nonlocal res | |
res = await asyncio.wait_for(awaiting(2), 0.5) | |
p_c = asyncio.create_task(wait()) | |
async def killer(): | |
dprint("killer started") | |
await asyncio.sleep(0.2) | |
dprint("killer waiting done") | |
p_c.cancel() | |
dprint("wait cancelled") | |
await asyncio.sleep(0.2) | |
await killer() | |
return res | |
async def test_waitfor_thrown_exception(inside=True): | |
exc = [None] | |
try: | |
await asyncio.wait_for(awaiting(2, exc=exc), 0.2) | |
except asyncio.TimeoutError: | |
dprint("Got timeout error") | |
if not inside: | |
return asyncio.TimeoutError # wait_for throws TimeoutError to caller | |
except asyncio.CancelledError: | |
dprint("Publish request got canceled") | |
return asyncio.CancelledError # not what is tested here. | |
finally: | |
if inside: | |
return exc[0] # asyncio.CancelledError expected | |
async def test_cancellation_forwarded(catch=False, catch_inside=False): | |
res = [False, False] | |
async def wait(): | |
nonlocal res | |
try: | |
await asyncio.wait_for(awaiting(2, catch_inside), 0.5) | |
except asyncio.TimeoutError: | |
dprint("Got timeout error") | |
res[0] = asyncio.TimeoutError | |
raise | |
except asyncio.CancelledError: | |
dprint("Got canceled") | |
if catch: | |
res[0] = True | |
else: | |
raise | |
async def killer(t): | |
dprint("killer started") | |
await asyncio.sleep(0.2) | |
dprint("killing wait()") | |
t.cancel() | |
t = asyncio.create_task(wait()) | |
k = asyncio.create_task(killer(t)) | |
try: | |
await t | |
except asyncio.CancelledError: | |
dprint("waiting got cancelled") | |
res[1] = asyncio.CancelledError | |
return res | |
async def test_wait_for_none(): | |
return await asyncio.wait_for(awaiting(0.2), None) | |
async def test_waitfor_event(timeout=True): | |
ev = asyncio.Event() | |
async def setEvent(): | |
await asyncio.sleep(0.1) | |
dprint("Event set") | |
ev.set() | |
t = asyncio.create_task(setEvent()) | |
try: | |
await asyncio.wait_for(ev.wait(), 0.05 if timeout else 0.2) | |
except asyncio.TimeoutError: | |
dprint("Got timeout error") | |
return asyncio.TimeoutError | |
except asyncio.CancelledError: | |
dprint("got canceled") | |
return asyncio.CancelledError # actually not happening here | |
finally: | |
await t | |
return True | |
def testAll(): | |
_test("wait_for cancelling [0.5 return]", publish(0.5, True), False) | |
_test("wait_for cancelling [0.5 raise]", publish(0.5, False), False) | |
_test("wait_for cancelling [0.1 return]", publish(0.1, True), "return_value") | |
_test("wait_for cancelling [0.1 raise]", publish(0.1, False), "return_value") | |
_test("Exception_type_thrown_in_coro_of_waitfor", test_waitfor_thrown_exception(True), | |
asyncio.CancelledError) | |
_test("Exception_type_thrown_to_caller_of_waitfor", test_waitfor_thrown_exception(False), | |
asyncio.TimeoutError) | |
_test("Wait_for_timeout_None", test_wait_for_none(), "return_value") | |
_test("test_cancellation_forwarded-catch", test_cancellation_forwarded(True), | |
[True, False]) | |
_test("test_cancellation_forwarded", test_cancellation_forwarded(), | |
[False, asyncio.CancelledError]) | |
_test("test_cancellation_forwarded-catch_catch-inside", | |
test_cancellation_forwarded(True, True), [True, False]) | |
_test("test_cancellation_forwarded_catch-inside", test_cancellation_forwarded(False, True), | |
[False, asyncio.CancelledError]) | |
_test("test_waitfor_event-timeout_true", test_waitfor_event(True), asyncio.TimeoutError) | |
_test("test_waitfor_event-timeout_false", test_waitfor_event(False), True) | |
testAll() | |
printSummary() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment