Skip to content

Instantly share code, notes, and snippets.

@daykin
Last active March 24, 2020 14:04
Show Gist options
  • Save daykin/8313007c642cb4959b8c59d4d6b73cf3 to your computer and use it in GitHub Desktop.
Save daykin/8313007c642cb4959b8c59d4d6b73cf3 to your computer and use it in GitHub Desktop.
camonitor callback
from pkg_resources import require
require('cothread')
import cothread
from cothread import Timedout
from cothread.catools import caput, camonitor
done = cothread.Event()
def callback(value):
if value == "whatever value I'm looking for":
done.Signal()
monitor = camonitor("foo", callback, datatype=DBR_STRING)
caput("foo", "whatever value I'm looking for")
try:
done.Wait(timeout=42)
print("success")
except Timedout:
print("failed")
monitor.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment