Skip to content

Instantly share code, notes, and snippets.

@prasadtalasila
Last active June 18, 2024 11:43
Show Gist options
  • Save prasadtalasila/1d133a021d678d87fdffa8e58abe7291 to your computer and use it in GitHub Desktop.
Save prasadtalasila/1d133a021d678d87fdffa8e58abe7291 to your computer and use it in GitHub Desktop.
CPSENS streaming
import matplotlib.pyplot as plt
import numpy as np
from paho.mqtt.client import Client as MQTTClient
from paho.mqtt.client import CallbackAPIVersion
from paho.mqtt.client import MQTTv311
import queue
import struct
import time
HOST = "hostname"
PORT = 1883
USERNAME = "username"
PASSWORD = "password"
MQTT_TOPIC = "topic"
data = []
data_queue = queue.Queue()
def on_connect(mqttc, userdata, flags, rc, properties=None):
print("connected with response code %s" %rc)
mqttc.subscribe(MQTT_TOPIC)
def on_subscribe(self, mqttc, userdata, msg, granted_qos):
print("mid/response = " + str(msg) + " / " + str(granted_qos))
def on_message(client, userdata, msg):
payload = msg.payload
data = struct.unpack_from('640f', payload, 20)
data_queue.put(data)
mqttc = MQTTClient(
callback_api_version=CallbackAPIVersion.VERSION2,
protocol=MQTTv311)
def main():
# Set username and password
mqttc.username_pw_set(USERNAME, PASSWORD)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.connect(HOST, PORT, 60)
mqttc.loop_start()
plt.ion()
fig, ax = plt.subplots()
line1, = ax.plot(np.linspace(1, 640, 640), np.linspace(-10, 10, 640))
while True:
if not data_queue.empty():
data = data_queue.get()
line1.set_ydata(data)
fig.canvas.draw()
fig.canvas.flush_events()
# Sleep for a short time to reduce CPU usage
time.sleep(0.1)
pass
if __name__ == "__main__":
main()
import numpy as np
from paho.mqtt.client import Client as MQTTClient
from paho.mqtt.client import CallbackAPIVersion
from paho.mqtt.client import MQTTv311
import struct
HOST = "hostname"
PORT = 1883
USERNAME = "username"
PASSWORD = "password"
MQTT_TOPIC = "topic"
data = []
def on_connect(mqttc, userdata, flags, rc, properties=None):
print("connected with response code %s" %rc)
mqttc.subscribe(MQTT_TOPIC)
def on_subscribe(self, mqttc, userdata, msg, granted_qos):
print("mid/response = " + str(msg) + " / " + str(granted_qos))
def on_message(client, userdata, msg):
payload = msg.payload
data = []
data = struct.unpack_from('640f', payload, 20)
print(data)
# process data here
mqttc = MQTTClient(
callback_api_version=CallbackAPIVersion.VERSION2,
protocol=MQTTv311)
def main():
mqttc.username_pw_set(USERNAME, PASSWORD)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.connect(HOST, PORT, 60)
mqttc.loop_start()
while True:
pass
if __name__ == "__main__":
main()
import json
import struct
from datetime import datetime
from paho.mqtt.client import Client as MQTTClient
from paho.mqtt.client import CallbackAPIVersion
from paho.mqtt.client import MQTTv311
config_file = 'sysid_config_private.json'
def load_config(config_file: str):
with open(config_file, 'r') as f:
config = json.load(f)
return config
config = load_config(config_file)
HOST = config['mqtt']['host']
PORT = config['mqtt']['port']
USERNAME = config['mqtt']['username']
PASSWORD = config['mqtt']['password']
MQTT_TOPICS = config['mqtt']['topics']
acceleration_values = {}
def on_connect(mqttc, userdata, flags, rc, properties=None):
print("connected with response code %s" %rc)
#mqttc.subscribe(MQTT_TOPICS[0])
for topic in MQTT_TOPICS:
mqttc.subscribe(topic)
# message payload packet structure:
# 2 byte for descriptor
# 2 bytes for metadata id
# 8 bytes for timestamp in seconds (milli and micro seconds are always zero)
# 8 bytes for nano seconds of a timestamp (always zero)
# 640 bytes for data
def on_subscribe(self, mqttc, userdata, msg, granted_qos):
print("mid/response = " + str(msg) + " / " + str(granted_qos))
def on_message(client, userdata, msg):
payload = msg.payload
print(msg.topic)
data = []
timestamp = struct.unpack('Q', payload[4:12])[0]
data = struct.unpack_from('640f', payload, 20)
if timestamp not in acceleration_values:
acceleration_values[timestamp] = {}
#acceleration_values[timestamp][msg.topic] = 'hello'
acceleration_values[timestamp][msg.topic] = data
print(datetime.fromtimestamp(timestamp).strftime("%y-%m-%d %H-%M-%S %f"))
#print(acceleration_values)
#print(acceleration_values.keys())
#for value in acceleration_values.values():
# print(value.keys())
mqttc = MQTTClient(
callback_api_version=CallbackAPIVersion.VERSION2,
protocol=MQTTv311)
def main():
mqttc.username_pw_set(USERNAME, PASSWORD)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.connect(HOST, PORT, 60)
mqttc.loop_start()
while True:
pass
if __name__ == "__main__":
main()
paho-mqtt==2.1.0
numpy==1.26.4
matplotlib==3.9.0
{
"mqtt": {
"host": "hostname",
"port": 1883,
"username": "username",
"password": "password",
"topics": ["topic1", "topic2"]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment