Skip to content

Instantly share code, notes, and snippets.

@brianoflondon
Created August 17, 2022 07:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brianoflondon/c5744bf981425ca15e3b7359597131d8 to your computer and use it in GitHub Desktop.
Save brianoflondon/c5744bf981425ca15e3b7359597131d8 to your computer and use it in GitHub Desktop.
import asyncio
import logging
import os
from datetime import datetime
from itertools import zip_longest
from timeit import default_timer as timer
from typing import List, Optional
from fastapi import BackgroundTasks, FastAPI, HTTPException, Path, Query, Request
from single_source import get_version
from podping_api_ext.helpers.podping_monitor import (
PingStatsBase,
PodpingPrintableData,
PodpingRCs,
get_podping_following,
load_rc_history,
snapshot_podpings,
snapshot_rc,
update_all,
)
__version__ = get_version(__name__, "", default_return="0.0.1")
app = FastAPI(
title="Podping Stats API",
description=f"API for Podping Stats",
version=__version__,
# terms_of_service="http://example.com/terms/",
# contact={
# "name": "Brian of London",
# "url": "http://x-force.example.com/contact/",
# "email": "dp@x-force.example.com",
# },
# license_info={
# "name": "Apache 2.0",
# "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
# },
)
@app.on_event("startup")
async def startup_event() -> dict:
"""Startup"""
logging.info(f"Starting up podping-api: {__version__}")
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = timer()
response = await call_next(request)
process_time = timer() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
# Exclude the value "string"
HIVE_ACCNAME_REGEX = (
r"^(?!string$)(?=.{3,16}$)[a-z]([0-9a-z]|[0-9a-z\-](?=[0-9a-z]))"
r"{2,}([\.](?=[a-z][0-9a-z\-][0-9a-z\-])[a-z]([0-9a-z]"
r"|[0-9a-z\-](?=[0-9a-z])){1,}){0,}$"
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-9s %(lineno) 4d : %(message)s",
# datefmt="%Y-%m-%dT%H:%M:%S%z",
# force=True,
)
logging.getLogger("uvicorn.error").setLevel(logging.CRITICAL)
# loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
@app.on_event("startup")
async def startup_event() -> dict:
"""Startup code"""
logging.info(f"Starting up podping-api: {__version__}")
@app.get(
"/v1/podping/pretty/", tags=["podping"], response_model=List[PodpingPrintableData]
)
async def podping_pretty(background_tasks: BackgroundTasks):
start = timer()
logging.info("podping_pretty: Checking all resources and podpings")
try:
tasks = [podping_resources(background_tasks), podping_history(background_tasks)]
rcs, pph = await asyncio.gather(*tasks)
pph_names = [item.hive_accname for item in pph]
answer: List[PodpingPrintableData] = []
for rc in rcs:
try:
pp = pph[pph_names.index(rc.hive_accname)]
line = f"{pp.hive_accname:<16} {rc.string_data} {pp.string_data}"
except ValueError:
pp = None
line = f"{rc.hive_accname:<16} {rc.string_data}"
logging.debug(line)
answer.append(
PodpingPrintableData(
hive_accname=rc.hive_accname, pretty_text=line, pp=pp, rc=rc
)
)
pp = pph[-1]
line = f"{pp.hive_accname:<16} {' '* 30} {pp.string_data}"
logging.debug(line)
answer.append(
PodpingPrintableData(hive_accname=pp.hive_accname, pretty_text=line, pp=pp)
)
logging.info(f"podping_pretty: Reource check complete after {timer() - start}")
return answer
except Exception as ex:
logging.exception(ex)
logging.error("Problem with fetching resources info")
raise HTTPException(status_code=500, detail="Something bad happened.")
@app.get("/v1/podping/resources/", tags=["podping"], response_model=List[PodpingRCs])
async def podping_resources(background_tasks: BackgroundTasks):
"""
Returns a list of all the accounts which are podping'ing and their
resource status
"""
return await snapshot_rc(background_tasks)
@app.get("/v1/podping/resources/history/{hive_accname}", tags=["podping"])
async def podping_resources_history(
hive_accname: str = Path(
None,
regex=HIVE_ACCNAME_REGEX,
description="Hive name to get resource history for",
),
age: Optional[int] = Query(
12, description="[Optional] Number of hours of history to return"
),
):
"""
Return resource history of one account
"""
age = age * 60 if age else None
df = await load_rc_history(hive_accname, age)
drop_list = ["hive_accname", "batch_uuid", "rc_30"]
if not df.empty:
df["age"] = datetime.utcnow() - df.index
drop_these = [column for column in drop_list if column in df]
for col in drop_these:
df.drop(col, axis=1, inplace=True)
# df.drop("hive_accname", axis=1, inplace=True)
# if "batch_uuid" in dfdf.drop("batch_uuid", axis=1, inplace=True)
df.rc_delta.fillna(0, inplace=True)
return df.to_dict()
raise HTTPException(
status_code=422, detail=f"No podping history for {hive_accname} account"
)
@app.get(
"/v1/podping/history/",
tags=["podping"],
response_model=List[PingStatsBase],
)
async def podping_history(
background_tasks: BackgroundTasks,
):
"""
Returns a list of all the accounts which are podping'ing and their
last n podpings
"""
return await snapshot_podpings(background_tasks)
@app.get(
"/v1/podping/history/{hive_accname}",
tags=["podping"],
response_model=List[PingStatsBase],
)
async def podping_history_acc_name(
background_tasks: BackgroundTasks,
hive_accname: Optional[str] = Path(
None,
regex=HIVE_ACCNAME_REGEX,
description="Hive name to get resource history for",
),
):
"""
Returns a list of all the accounts which are podping'ing and their
last n podpings
"""
return await snapshot_podpings(background_tasks, hive_accname)
@app.get(
"/v1/podping/active_accounts",
tags=["podping"],
response_model=List[str],
)
async def podping_history_acc_name():
"""
Returns a list of all the accounts which are active and recognised
to podping
"""
_, following = await get_podping_following()
return following
@app.get("/v1/podping/force_update/", tags=["podping"], include_in_schema=False)
async def force_update(background_tasks: BackgroundTasks):
"""
Force update the database with loading old data (useful for schema changes)
"""
background_tasks.add_task(update_all)
return {"message": "Updates started"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment