Last active
May 11, 2023 12:15
-
-
Save snorklerjoe/1b6ffe4b4c46bed7bf34dd03a431c1bc to your computer and use it in GitHub Desktop.
A basic library for timeout-managed tasks in CircuitPython
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" A brief library for timeout-managed tasks in CircuitPython | |
By Joseph R. Freeston | |
Copyright April 2023 | |
""" | |
import time | |
from microcontroller import watchdog as w | |
from watchdog import WatchDogMode, WatchDogTimeout | |
class WatchdogState: | |
""" A simple class for saving the watchdog state temporarily so as not to interrupt other things | |
""" | |
def __init__(self, timeout: int, mode: WatchDogMode): | |
self.timeout = timeout | |
self.mode = mode | |
self.initialized = True | |
def restore(self): | |
"""Restores this state to the watchdog""" | |
w.timeout = self.timeout | |
w.mode = self.mode | |
def push(self): | |
"""Pushes this state to the watchdog""" | |
try: | |
w.deinit() | |
except ValueError: | |
pass | |
w.timeout = self.timeout or 1 | |
w.mode = self.mode | |
while not self.initialized: | |
try: | |
w.deinit() | |
break | |
except ValueError: | |
time.sleep(0.01) | |
continue | |
@classmethod | |
def grab(cls) -> 'WatchdogState': | |
"""Grabs & returns current watchdog state""" | |
return cls(w.timeout, w.mode) | |
@classmethod | |
def pop(cls) -> 'WatchdogState': | |
"""Pops & returns current watchdog state""" | |
state = cls.grab() | |
try: | |
w.deinit() | |
state.initialized = w.mode is not None | |
except ValueError: | |
state.initialized = False | |
return state | |
class TimeoutBehavior: | |
NONE = 0 # Return None on timout | |
RAISE = 1 # Raise WatchDogTimeout on timeout | |
FALSE = 2 # Return False on timeout | |
DEFAULT_BEHAVIOR = TimeoutBehavior.NONE | |
class Timeout: | |
"""Basic Timeout class""" | |
def __init__(self, timeout: int, behavior: int = DEFAULT_BEHAVIOR): | |
self.timeout = timeout | |
self.wd = None | |
self.wd_during = WatchdogState(timeout, WatchDogMode.RAISE) | |
self.behavior = behavior | |
def reset(self): | |
"""Resets the timeout watchdog""" | |
w.feed() | |
def __call__(self, callable_func): | |
"""Oversees a callable with this timeout | |
(a decorator method) | |
""" | |
def wrapper(*args, **kwargs): | |
self.wd = WatchdogState.pop() | |
self.wd_during.push() | |
try: | |
return callable_func(*args, **kwargs) | |
except WatchDogTimeout: | |
if self.behavior == TimeoutBehavior.NONE: | |
return None | |
elif self.behavior == TimeoutBehavior.FALSE: | |
return False | |
elif self.behavior == TimeoutBehavior.RAISE: | |
raise | |
else: | |
raise ValueError(f"Unsupported timeout behavior {self.behavior}") | |
finally: | |
self.wd.push() | |
return wrapper |
(Created for use with CubeServer-api-python)
- Needs to be updated so that Timeout.reset() only acts if the current timeout is in effect
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage Example:
Simple, returning "None":
More complex, with timeout handling: