Skip to content

Instantly share code, notes, and snippets.

@xgenvn
Forked from turbinenreiter/non_blocking_hw_api.py
Created November 1, 2016 02:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xgenvn/0e5a27cc69c130ca2619b4c8804f7624 to your computer and use it in GitHub Desktop.
Save xgenvn/0e5a27cc69c130ca2619b4c8804f7624 to your computer and use it in GitHub Desktop.
import time
class RangeSensor:
def __init__(self):
pass
def make_generator(self):
t_end = time.time() + 0.1
# tell sensor to start measurement
while time.time() < t_end: # while it isn't done yet
yield None # yield None
# read the result from the sensor
yield 'result'
def range(self):
# the blocking call creates a generator and waits until it stops
local_generator = self.make_generator()
while True:
try:
val = next(local_generator)
except StopIteration:
break
if val is not None:
return val
def range_non_blocking(self):
# the non blocking call creates a generator if necessary and returns its current value
# when no new value is available, it can either return None or the previous value
try:
val = next(self.range_generator)
except AttributeError:
self.range_generator = self.make_generator()
val = next(self.range_generator)
except StopIteration:
self.range_generator = self.make_generator()
val = next(self.range_generator)
return val
class Accel:
def x(self):
return(1)
rs = RangeSensor()
accel = Accel()
# example 1
tw_end = time.time() + 0.3
while time.time() < tw_end:
val = rs.range_non_blocking()
if val is None:
print('...', end='\r')
else:
print(val)
# example 2
tw_end = time.time() + 0.3
while time.time() < tw_end:
print(accel.x(), rs.range_non_blocking())
time.sleep(0.025)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment