Skip to content

Instantly share code, notes, and snippets.

@wwj718
Created May 5, 2018 01:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wwj718/6d15b7e6f600dbea613ee6ea625a3df0 to your computer and use it in GitHub Desktop.
Save wwj718/6d15b7e6f600dbea613ee6ea625a3df0 to your computer and use it in GitHub Desktop.
垃圾onenet
import asyncio
from hbmqtt.client import MQTTClient, ConnectException
from hbmqtt.mqtt.constants import QOS_1, QOS_2
#产品ID
username='135585'
#产品APIKey:
password = "to do"
# [mqtt|ws][s]://[username][:password]@host.domain[:port]
SERVER = "mqtt://{}:{}@mqtt.heclouds.com:6002".format(username,password)
print(SERVER)
#设备ID
CLIENT_ID = "29975808"
# https://stackoverflow.com/questions/31899679/handling-async-in-python
async def test_coro():
C = MQTTClient(client_id=CLIENT_ID)
await C.connect(SERVER)
tasks = [
asyncio.ensure_future(C.publish('/hello', b'TEST MESSAGE WITH QOS_0')),
]
await asyncio.wait(tasks)
await C.disconnect()
async def uptime_coro():
C = MQTTClient(client_id=CLIENT_ID)
await C.connect(SERVER)
# Subscribe to '$SYS/broker/uptime' with QOS=1
await C.subscribe([
('$SYS', QOS_1),
])
try:
for i in range(1, 100):
message = await C.deliver_message()
packet = message.publish_packet
print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
C.publish('/hello', b'TEST MESSAGE WITH QOS_0') # https://open.iot.10086.cn/doc/art463.html#107
except ClientException as ce:
print(str(ce))
if __name__ == '__main__':
print("run")
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.get_event_loop().run_until_complete(uptime_coro())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment