Skip to content

Instantly share code, notes, and snippets.

@ashvayka
Created May 8, 2023 09:49
Show Gist options
  • Save ashvayka/f3730fa3d2ad27c3782872dd174907ff to your computer and use it in GitHub Desktop.
Save ashvayka/f3730fa3d2ad27c3782872dd174907ff to your computer and use it in GitHub Desktop.
Sample application for single board computers to connect to ThingsBoard IoT platform
import logging.handlers
import time
import os
from tb_gateway_mqtt import TBDeviceMqttClient
logging.basicConfig(level=logging.DEBUG)
client = None
# default blinking period
period = 1.0
# callback function that will call when we will change value of our Shared Attribute
def attribute_callback(client, result):
print(client, result)
# make sure that you paste YOUR shared attribute name
period = result['blinkingPeriod']
# callback function that will call when we will send RPC
def rpc_callback(id, request_body):
# request body contains method and other parameters
print(request_body)
method = request_body["method"]
if method == 'getTelemetry':
attributes, telemetry = get_data()
client.send_attributes(attributes)
client.send_telemetry(telemetry)
else:
print('Unknown method: ' + method)
def get_data():
cpu_usage = round(float(os.popen('''grep 'cpu ' /proc/stat | awk '{usage=($2+$4)*100/($2+$4+$5)} END {print usage }' ''').readline()), 2)
ip_address = os.popen('''hostname -I''').readline()[:-2]
mac_address = os.popen('''cat /sys/class/net/*/address''').readline()[:-1]
processes_count = os.popen('''ps -Al | grep -c bash''').readline()[:-1]
swap_memory_usage = os.popen("free -m | grep Swap | awk '{print ($3/$2)*100}'").readline()[:-1]
ram_usage = float(os.popen("free -m | grep Mem | awk '{print ($3/$2) * 100}'").readline()[:-1])
st = os.statvfs('/')
used = (st.f_blocks - st.f_bfree) * st.f_frsize
boot_time = os.popen('uptime -p').read()[:-1]
avg_load = (cpu_usage + ram_usage) / 2
attributes = {
'ip_address': ip_address,
'macaddress': mac_address
}
telemetry = {
'cpu_usage': cpu_usage,
'processes_count': processes_count,
'disk_usage': used,
'RAM_usage': ram_usage,
'swap_memory_usage': swap_memory_usage,
'boot_time': boot_time,
'avg_load': avg_load
}
return attributes, telemetry
# request attribute callback
def sync_state(result, exception=None):
global period
if exception is not None:
print("Exception: " + str(exception))
else:
period = result['shared']['blinkingPeriod']
def main():
global client
client = TBDeviceMqttClient("thingsboard.cloud", 1883, "TEST_TOKEN")
client.connect()
client.request_attributes(shared_keys=['blinkingPeriod'], callback=sync_state)
# now attribute_callback will process shared attribute request from server
sub_id_1 = client.subscribe_to_attribute("blinkingPeriod", attribute_callback)
sub_id_2 = client.subscribe_to_all_attributes(attribute_callback)
# now rpc_callback will process rpc requests from server
client.set_server_side_rpc_request_handler(rpc_callback)
while not client.stopped:
attributes, telemetry = get_data()
client.send_attributes(attributes)
client.send_telemetry(telemetry)
time.sleep(60)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment