Skip to content

Instantly share code, notes, and snippets.

@snorklerjoe
Last active May 11, 2023 12:15
Show Gist options
  • Save snorklerjoe/1b6ffe4b4c46bed7bf34dd03a431c1bc to your computer and use it in GitHub Desktop.
Save snorklerjoe/1b6ffe4b4c46bed7bf34dd03a431c1bc to your computer and use it in GitHub Desktop.
A basic library for timeout-managed tasks in CircuitPython
""" A brief library for timeout-managed tasks in CircuitPython
By Joseph R. Freeston
Copyright April 2023
"""
import time
from microcontroller import watchdog as w
from watchdog import WatchDogMode, WatchDogTimeout
class WatchdogState:
""" A simple class for saving the watchdog state temporarily so as not to interrupt other things
"""
def __init__(self, timeout: int, mode: WatchDogMode):
self.timeout = timeout
self.mode = mode
self.initialized = True
def restore(self):
"""Restores this state to the watchdog"""
w.timeout = self.timeout
w.mode = self.mode
def push(self):
"""Pushes this state to the watchdog"""
try:
w.deinit()
except ValueError:
pass
w.timeout = self.timeout or 1
w.mode = self.mode
while not self.initialized:
try:
w.deinit()
break
except ValueError:
time.sleep(0.01)
continue
@classmethod
def grab(cls) -> 'WatchdogState':
"""Grabs & returns current watchdog state"""
return cls(w.timeout, w.mode)
@classmethod
def pop(cls) -> 'WatchdogState':
"""Pops & returns current watchdog state"""
state = cls.grab()
try:
w.deinit()
state.initialized = w.mode is not None
except ValueError:
state.initialized = False
return state
class TimeoutBehavior:
NONE = 0 # Return None on timout
RAISE = 1 # Raise WatchDogTimeout on timeout
FALSE = 2 # Return False on timeout
DEFAULT_BEHAVIOR = TimeoutBehavior.NONE
class Timeout:
"""Basic Timeout class"""
def __init__(self, timeout: int, behavior: int = DEFAULT_BEHAVIOR):
self.timeout = timeout
self.wd = None
self.wd_during = WatchdogState(timeout, WatchDogMode.RAISE)
self.behavior = behavior
def reset(self):
"""Resets the timeout watchdog"""
w.feed()
def __call__(self, callable_func):
"""Oversees a callable with this timeout
(a decorator method)
"""
def wrapper(*args, **kwargs):
self.wd = WatchdogState.pop()
self.wd_during.push()
try:
return callable_func(*args, **kwargs)
except WatchDogTimeout:
if self.behavior == TimeoutBehavior.NONE:
return None
elif self.behavior == TimeoutBehavior.FALSE:
return False
elif self.behavior == TimeoutBehavior.RAISE:
raise
else:
raise ValueError(f"Unsupported timeout behavior {self.behavior}")
finally:
self.wd.push()
return wrapper
@snorklerjoe
Copy link
Author

snorklerjoe commented Apr 30, 2023

Usage Example:

Simple, returning "None":

import time
import timeout

@timeout.Timeout(5)      # Timeout of 5 seconds
def test(num):
    """Function will count up to (exclusive) the number given as an argument,
    waiting one second between each integer and returning the string of each
    integer concatenated.
    """
    a=''
    for i in range(num):
        time.sleep(1)
        a += str(i)
    return a

print("Will work:")
print(test(3))           # prints "012"
print()
print("Will not work:")
print(test(8))           # prints None
print()

More complex, with timeout handling:

import time
import timeout

@timeout.Timeout(5, timeout.TimeoutBehavior.RAISE)      # Timeout of 5 seconds
def test(num):
    """Function will count up to (exclusive) the number given as an argument,
    waiting one second between each integer and returning the string of each
    integer concatenated.
    """
    a=''
    for i in range(num):
        time.sleep(1)
        a += str(i)
    return a

try:
    test(12)
    print("This will never be printed!")
    print("test(12) will time out before it ever gets the chance to finish!")
except timeout.WatchDogTimeout:
    print("Oooh, it timed out!")
    # Presumably do something more interesting...
    # We bothered to catch the timeout, after all!

@snorklerjoe
Copy link
Author

(Created for use with CubeServer-api-python)

@snorklerjoe
Copy link
Author

snorklerjoe commented May 11, 2023

  • Needs to be updated so that Timeout.reset() only acts if the current timeout is in effect

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment