Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save fabioafreitas/2587012b2cd506bed708e8c734a78bef to your computer and use it in GitHub Desktop.
Save fabioafreitas/2587012b2cd506bed708e8c734a78bef to your computer and use it in GitHub Desktop.
Thingsboard Send Telemetry
import requests, random
from datetime import datetime
import time
if __name__ == '__main__':
tempo_segundos = 10
device_access_token = 'nViVXjynNC4pvawunXFO'
while True:
dt_now = int(datetime.timestamp(datetime.now())*1000)
fake_telemetry = {"ts":dt_now, "values":{"temperature":random.uniform(25, 30),"ph":random.uniform(5, 8)}}
res = requests.post(
url = f'https://thingsboard.cloud/api/v1/{device_access_token}/telemetry',
headers={
"Content-Type":"application/json"
},
json=fake_telemetry
)
print(res.status_code)
time.sleep(tempo_segundos)
import requests, random
from datetime import datetime, timedelta
import math
import random
def get_random_arbitrary(min_value, max_value):
return random.uniform(min_value, max_value)
def generate_sin_function(date, amplitude, offset):
hours = date.hour + date.minute / 60
angle = ((hours - 12) / 12) * math.pi
sin_value = math.sin(angle) * amplitude + offset
return sin_value
def generate_oxigenio(date):
gen_value = generate_sin_function(date, 2.5, 6)
randomize_val = random.uniform(gen_value - 0.05, gen_value + 0.05)
return randomize_val
def generate_temperatura(date):
gen_value = generate_sin_function(date, 1.5, 27)
randomize_val = random.uniform(gen_value - 0.01, gen_value + 0.01)
return randomize_val
def generate_ph(date):
gen_value = generate_sin_function(date, 0.5, 7)
randomize_val = random.uniform(gen_value - 0.01, gen_value + 0.01)
return randomize_val
if __name__ == '__main__':
date = datetime.now()
print(generate_oxigenio(date))
print(generate_temperatura(date))
print(generate_ph(date))
# dt_now = int(datetime.timestamp(datetime.now())*1000)
lista = [] # {"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
count = 1
time_between_telemetry_sending = 5 #minutes
iterations = int(12*30*24*60/time_between_telemetry_sending) # number of 5 minutes durations within 360 days
for i in range(iterations):
unixTsMillis = int(datetime.timestamp(date)*1000)
lista.append({"ts":unixTsMillis,"values":{
"temperatura":round(generate_temperatura(date),3),
"ph":round(generate_ph(date),3),
"oxigenio":round(generate_oxigenio(date),3)
}})
date -= timedelta(minutes=time_between_telemetry_sending)
chunk_size = 20
for i in range(0, len(lista), chunk_size):
chunk = lista[i:i+chunk_size]
print(f"Chunk {i//chunk_size + 1}, len {len(chunk)}")
accessToken = 'accesstoken'
res = requests.post(
url = f'https://thingsboard.smartrural.com.br/api/v1/{accessToken}/telemetry',
headers={
"Content-Type":"application/json"
},
json=chunk
)
print(res.status_code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment