Skip to content

Instantly share code, notes, and snippets.

@butlerx
Last active July 12, 2019 10:01
Show Gist options
  • Save butlerx/a007ecdf41e5581aec9e7b14101566d8 to your computer and use it in GitHub Desktop.
Save butlerx/a007ecdf41e5581aec9e7b14101566d8 to your computer and use it in GitHub Desktop.
async graphite tree walker
#!/usr/bin/env python3
import argparse
import logging
from asyncio import AbstractEventLoop, ensure_future, gather, get_event_loop, sleep
from collections import deque
from typing import Deque, List
import structlog
from aiohttp import BasicAuth, ClientSession, ContentTypeError
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
)
formatter = structlog.stdlib.ProcessorFormatter(
processor=structlog.dev.ConsoleRenderer()
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)
class Branch:
def __init__(self, prefix: str, depth: int):
self.prefix = prefix
self.depth = depth
self.attempts = 0
class Walker:
"""Graphite tree walker"""
def __init__(
self,
url: str,
file: str,
user: str = None,
password: str = None,
series_from: str = None,
loop: AbstractEventLoop = None,
max_depth: int = None,
):
self.queue: Deque[Branch] = deque()
self.url = url
self.user = user
self.password = password
self.series_from = series_from
self.enabled = False
self.max_attempts = 3
self.loop = loop or get_event_loop()
self._dispatch_task = None
self.logger = structlog.get_logger("walker")
self.max_depth = max_depth
self.metrics_paths: List[str] = []
self.output_file = file
async def run(self):
"""process event queue"""
self.logger.debug("processing event queue")
self.enabled = True
while self.enabled:
await self._dispatch()
await sleep(1, loop=self.loop)
if not self.queue:
return
async def _dispatch(self):
tasks = []
while self.queue:
tasks.append(self._walk(self.queue.popleft()))
if not tasks:
return
return await gather(*tasks, loop=self.loop)
async def _format_payload(self, task: Branch):
payload = {
"query": f"{task.prefix}.*" if task.prefix != "*" else task.prefix,
"format": "treejson",
}
if self.series_from:
payload["from"] = self.series_from
return payload
async def _walk(self, task: Branch):
task.attempts += 1
if task.attempts > self.max_attempts:
return
async with ClientSession(loop=self.loop) as session:
self.logger.debug("Walking Branch", branch=task.prefix, depth=task.depth)
result = await session.get(
f"{self.url}/metrics/find",
params=await self._format_payload(task),
auth=BasicAuth(login=self.user, password=self.password)
if self.user is not None and self.password is not None
else None,
)
if result.status == 429:
self.logger.error(
"requesting too many metrics backing off",
branch=task.prefix,
depth=task.depth,
status_code=result.status,
)
await sleep(60, loop=self.loop)
self.queue.appendleft(task)
elif result.status >= 400:
self.logger.error(
"Failed to walk branch",
branch=task.prefix,
depth=task.depth,
status_code=result.status,
)
await self._store_metric(task.prefix)
else:
tasks = []
try:
metrics = await result.json()
except ContentTypeError:
self.logger.error(
"failed to decode json", branch=task.prefix, depth=task.depth
)
metrics = []
for metric in metrics:
if metric["leaf"]:
await self._store_metric(metric["id"])
else:
next_depth = task.depth + 1
if self.max_depth and next_depth >= self.max_depth:
await self._store_metric(metric["id"])
else:
tasks.append(Branch(metric["id"], next_depth))
self.queue.extend(tasks)
async def _store_metric(self, metric: str):
self.metrics_paths.append(metric)
self.logger.debug("found metric", metric=metric)
async def enqueue(self, branch: Branch):
self.logger.info("adding branch", prefix=branch.prefix, depth=branch.depth)
self.queue.append(branch)
def show_tree(self):
self.metrics_paths.sort()
with open(self.output_file, "w") as file:
for metric in self.metrics_paths:
file.write(f"{metric}\n")
async def main():
"""main cli interface"""
parser = argparse.ArgumentParser()
parser.add_argument("--url", "-u", help="Graphite URL", required=True)
parser.add_argument(
"--prefix", "-p", help="Metrics prefix", required=False, default="*"
)
parser.add_argument("--user", help="Basic Auth username", required=False)
parser.add_argument("--password", help="Basic Auth password", required=False)
parser.add_argument(
"--from",
dest="seriesFrom",
help="only get series that have been active since this time",
required=False,
)
parser.add_argument(
"--depth",
"-d",
type=int,
help="maximum depth to traverse. If set, the branches at the depth will be printed",
required=False,
)
parser.add_argument(
"--file",
"-f",
type=str,
help="file to write metric tree too",
required=False,
default="metrics_tree.txt",
)
args = parser.parse_args()
walker = Walker(
args.url,
user=args.user,
password=args.password,
series_from=args.seriesFrom,
file=args.file,
max_depth=args.depth,
)
await walker.enqueue(Branch(args.prefix, 0))
await walker.run()
walker.show_tree()
if __name__ == "__main__":
try:
# For production, use libuv as our eventloop as it is *FAR*
# more performant than the default asyncio event loop.
# https://magic.io/blog/uvloop-blazing-fast-python-networking/
from uvloop import EventLoopPolicy
from asyncio import set_event_loop_policy
set_event_loop_policy(EventLoopPolicy())
root_logger.info("Using uvloop for asyncio")
except Exception:
pass
loop: AbstractEventLoop = get_event_loop()
try:
loop.run_until_complete(ensure_future(main()))
except KeyboardInterrupt:
root_logger.info("Shutting down application")
loop.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment