Skip to content

Instantly share code, notes, and snippets.

@mckelvin
Created March 25, 2018 09:49
Show Gist options
  • Save mckelvin/a87829e1931bd5b5d081d8103bda3d43 to your computer and use it in GitHub Desktop.
Save mckelvin/a87829e1931bd5b5d081d8103bda3d43 to your computer and use it in GitHub Desktop.
"""
352 Air PM2.5 采集仪指标采集
"""
import time
import struct
import socket
UDP_IP = "0.0.0.0"
UDP_PORT = 11530
def yield_pm250_index():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(1024)
if len(data) != 33:
continue
flag, = struct.unpack("B", data[:1])
if flag != 0xA1:
continue
ipaddr, port = addr
mac_addr = ":".join(["%02X"] * 6) % struct.unpack("BBBBBB", data[2:8])
pm250, = struct.unpack(">H", data[19:21])
pm250_alt, = struct.unpack(">H", data[21:23])
assert pm250 == pm250_alt
yield {
"ip": ipaddr,
"port": port,
"mac": mac_addr,
"pm250": pm250,
}
def main():
for metric_dict in yield_pm250_index():
timestamp = time.time()
print("@{0:.0f}, {ip}:{port}, {pm250}".format(timestamp,
**metric_dict))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment