Skip to content

Instantly share code, notes, and snippets.

@jr-k
Forked from fpletz/esphome-print-async.py
Created April 17, 2020 14:54
Show Gist options
  • Save jr-k/9f89ef15342c17ed8f6002e6f62d7eb5 to your computer and use it in GitHub Desktop.
Save jr-k/9f89ef15342c17ed8f6002e6f62d7eb5 to your computer and use it in GitHub Desktop.
aioesphomeapi minimal examples
#!/usr/bin/env python3
import aioesphomeapi
import asyncio
async def main():
loop = asyncio.get_running_loop()
cli = aioesphomeapi.APIClient(loop, "foobarhostname", 6053, "foobarpassword")
await cli.connect(login=True)
def cb(state):
#if type(state) == aioesphomeapi.BinarySensorState:
print(state)
await cli.subscribe_states(cb)
l = await cli.list_entities_services()
print(l)
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(main())
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
#!/usr/bin/env python3
import aioesphomeapi
import asyncio
loop = asyncio.new_event_loop()
cli = aioesphomeapi.APIClient(loop, "foobarhostname", 6053, "foobarpassword")
loop.run_until_complete(cli.connect(login=True))
def cb(state):
print(state)
loop.run_until_complete(cli.subscribe_states(cb))
loop.run_forever()
#!/usr/bin/env python3
import aioesphomeapi
import asyncio
from influxdb import InfluxDBClient
from influxdb import SeriesHelper
influxclient = InfluxDBClient('influxhost', 8086, database="dbname")
class EspSeries(SeriesHelper):
class Meta:
client = influxclient
series_name = 'events.stats.{sensor}'
fields = [ 'value' ]
tags = [ 'sensor' ]
bulk_size = 1
autocommit = True
async def main():
loop = asyncio.get_running_loop()
cli = aioesphomeapi.APIClient(loop, "foobarhostname", 6053, "foobarpassword")
await cli.connect(login=True)
sensors, services = await cli.list_entities_services()
sensor_by_keys = dict((sensor.key, sensor.name) for sensor in sensors)
def cb(state):
if type(state) == aioesphomeapi.SensorState:
print(state)
print(sensor_by_keys[state.key])
print(state.state)
EspSeries(sensor=sensor_by_keys[state.key], value=state.state)
await cli.subscribe_states(cb)
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(main())
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment