Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
asyncio statsd client
import asyncio
import statsd
class AsyncStatsClient(statsd.StatsClient):
class DummyProto(asyncio.DatagramProtocol):
pass
def __init__(self, host='localhost', port=8125, prefix=None, maxudpsize=512):
self._addr = host, port
self._prefix = prefix
self._maxudpsize = maxudpsize
async def connect(self):
# Yes, UDP in asyncio need an explicit transport.
loop = asyncio.get_event_loop()
self._transport, self._protocol = await loop.create_datagram_endpoint(
self.DummyProto, remote_addr=self._addr)
def _send(self, data):
try:
self._transport.sendto(data.encode('ascii'))
except:
pass
async def go():
c = AsyncStatsClient('grafana.remote', prefix='boo')
await c.connect()
c.incr('baz') # Increment the 'foo' counter.
c.timing('stats.timed', 320) # Record a 320ms 'stats.timed'.
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.