Skip to content

Instantly share code, notes, and snippets.

@elbruno
Last active January 18, 2021 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elbruno/7202dcf78cb3c5fe2e5bbc26b1ce5798 to your computer and use it in GitHub Desktop.
Save elbruno/7202dcf78cb3c5fe2e5bbc26b1ce5798 to your computer and use it in GitHub Desktop.
DroneDeviceAzureIoT.py
# Copyright (c) 2020
# Author: Bruno Capuano
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
import os
import datetime
import asyncio
import json
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import MethodResponse
from azure.iot.device.aio import ProvisioningDeviceClient
class Drone_Device():
def __init__(self, scope, device_id, key, iothub : str = ""):
self.scope = scope
self.device_id = device_id
self.key = key
self.iothub = iothub
async def init_azureIoT(self):
cnn_str = await self.get_connection_string()
self.device_client = IoTHubDeviceClient.create_from_connection_string(cnn_str)
await self.device_client.connect()
async def __register_device(self):
provisioning_device_client = ProvisioningDeviceClient.create_from_symmetric_key(
provisioning_host='global.azure-devices-provisioning.net',
registration_id=self.device_id,
id_scope=self.scope,
symmetric_key=self.key,
)
return await provisioning_device_client.register()
async def get_connection_string(self):
if(self.iothub == None or self.iothub == ""):
print(f'{datetime.datetime.now()}: No IOTHUB specified. Attempting to resolve via global.azure-devices-provisioning.net')
results = await asyncio.gather(self.__register_device())
print(results)
registration_result = results[0]
cnn_str = 'HostName=' + registration_result.registration_state.assigned_hub + \
';DeviceId=' + self.device_id + \
';SharedAccessKey=' + self.key
else:
cnn_str = 'HostName=' + self.iothub + \
';DeviceId=' + self.device_id + \
';SharedAccessKey=' + self.key
print(f'{datetime.datetime.now()}: Connection String = {cnn_str}')
return cnn_str
async def send_telemetry(self, agx, agy, agz):
try:
payload:str = ""
data = {
"agx": agx,
"agy": agy,
"agz": agz
}
payload = json.dumps(data)
print(f"{datetime.datetime.now()}: telemetry: {payload}")
await self.device_client.send_message(payload)
except Exception as e:
print(f"{datetime.datetime.now()}: Exception during sending metrics: {e}")
async def send_properties(self, bat, temph, templ):
try:
data = {
'bat': bat,
'templ': templ,
'temph': temph
}
propertiesToUpdate = data
print(f"{datetime.datetime.now()}: properties: {propertiesToUpdate}")
await self.device_client.patch_twin_reported_properties(propertiesToUpdate)
except Exception as e:
print(f"{datetime.datetime.now()}: Exception during sending metrics: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment