Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
readDigitalTwinInformationInAzIotEdgeModule.py
def twin_patch_receive_messages():
global twin_callbacks
global module_client
global read_twin_sleep_interval
# Define behavior for receiving twin desired property patches
def twin_patch_handler(twin_patch):
try:
# sample received data
# {'doorState': 1, 'doorStateDesc': '3', 'doorStateSource': 'digital twin new', '$version': 88}
doorState = twin_patch["doorState"]
source = twin_patch["doorStateSource"]
print (f" New Door state : {doorState} - source : {source}")
except Exception as ex:
print ( "Unexpected error in twin_patch_handler: %s" % ex )
while True:
try:
# Set handlers on the client
module_client.on_twin_desired_properties_patch_received = twin_patch_handler
twin_callbacks += 1
time.sleep(read_twin_sleep_interval) # settings to define how often refresh dt information
except Exception as ex:
print ( "Unexpected error in twin_patch_listener: %s" % ex )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment