Created May 19, 2023 13:08
Nitter instances checker
from asyncio import FIRST_COMPLETED, create_task, wait
from typing import Optional
from httpx import AsyncClient
from rich.progress import Progress, TaskID
from yaml import safe_load as yaml_parse
TEST_STATUS = "/04119__snail/status/1550513579056824322"
async def get_instances() -> list[str]:
async with AsyncClient() as c:
data = yaml_parse((await c.get(INSTANCES_YAML_URL)).text)
return [site["url"] for site in data["sites"]]
async def check_instance(instance: str) -> tuple[str, bool, Optional[Exception]]:
async with AsyncClient() as c:
rq = await c.get(instance + TEST_STATUS, timeout=1.0)
assert rq.status_code == 200
assert rq.text.index('@04119__snail') != -1
return instance, True, None
except Exception as e:
return instance, False, e
async def amain():
instances: list[str] = await get_instances()
print(f"{len(instances)} instances found")
working: list[str] = []
done, pending = set(), set()
while instances or pending:
while len(pending) < MAX_CONCURRENCY and instances:
done, pending = await wait(pending, return_when=FIRST_COMPLETED)
for task in done:
instance, status, error = task.result()
print(f"\x1b[{92 if status else 91}m{instance}\x1b[33m {error!r}\x1b[0m")
if status:
def main():
from asyncio import run
if __name__ == "__main__":
