Last active
June 11, 2022 13:45
-
-
Save DarkKirb/533e9e8b9b6dd1badf9b103b4285dc4e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!@python@/bin/python | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
import functools | |
from typing import Any, AsyncIterable, Awaitable, Callable, Optional, TypeVar, cast | |
from os import path, listdir | |
import datetime | |
import json | |
import boto3 | |
from botocore.response import StreamingBody | |
ENDPOINT_URL: str = "https://s3.us-west-000.backblazeb2.com" | |
BUCKET_NAME: str = "cache-chir-rs" | |
executor: ThreadPoolExecutor = ThreadPoolExecutor() | |
F = TypeVar('F', bound=Callable[..., Any]) | |
T = TypeVar('T') | |
def with_backoff( | |
f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: | |
async def with_backoff_wrapper(*args: Any, **kwargs: Any) -> T: | |
last_delay = 2 | |
while True: | |
try: | |
return await f(*args, **kwargs) | |
except Exception as e: | |
print(f"{e}") | |
if last_delay >= 120: | |
raise | |
await asyncio.sleep(last_delay) | |
last_delay *= last_delay | |
return with_backoff_wrapper | |
def aio(f: Callable[..., T]) -> Callable[..., Awaitable[T]]: | |
async def aio_wrapper(*args: Any, **kwargs: Any) -> T: | |
f_bound: Callable[[], T] = functools.partial(f, *args, **kwargs) | |
loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() | |
return await loop.run_in_executor(executor, f_bound) | |
return aio_wrapper | |
@aio | |
def exists_locally(store_path: str) -> bool: | |
return path.exists(store_path) | |
class NarInfo(object): | |
def __init__(self, narinfo: str) -> None: | |
self.compression = "bzip2" | |
for narinfo_line in narinfo.splitlines(): | |
key, value = narinfo_line.split(": ", 1) | |
if key == "StorePath": | |
self.store_path = value | |
elif key == "URL": | |
self.url = value | |
elif key == "Compression": | |
self.compression = value | |
elif key == "FileHash": | |
self.file_hash = value | |
elif key == "FileSize": | |
self.file_size = int(value) | |
elif key == "NarHash": | |
self.nar_hash = value | |
elif key == "NarSize": | |
self.nar_size = int(value) | |
elif key == "References": | |
self.references = value.split() | |
elif key == "Deriver": | |
self.deriver = value | |
elif key == "System": | |
self.system = value | |
elif key == "Sig": | |
self.sig = value | |
elif key == "CA": | |
self.ca = value | |
async def exists_locally(self) -> bool: | |
return await exists_locally(self.store_path) | |
s3 = boto3.client("s3", endpoint_url=ENDPOINT_URL) | |
@with_backoff | |
@aio | |
def get_object(Key: str) -> str: | |
obj = s3.get_object(Bucket=BUCKET_NAME, Key=Key) | |
if "Body" not in obj: | |
raise Exception("No Body") | |
if isinstance(obj["Body"], StreamingBody): | |
return obj["Body"].read().decode("utf-8") | |
raise Exception("Not StreamingBody") | |
async def list_cache_objects() -> AsyncIterable[str]: | |
@with_backoff | |
@aio | |
def list_objects_v2(ContinuationToken: Optional[str]) -> dict[str, Any]: | |
if ContinuationToken != None: | |
return s3.list_objects_v2(Bucket=BUCKET_NAME, | |
ContinuationToken=ContinuationToken) | |
else: | |
return s3.list_objects_v2(Bucket=BUCKET_NAME) | |
cont_token = None | |
while True: | |
objs = await list_objects_v2(cont_token) | |
if "Contents" not in objs: | |
raise Exception("No Contents") | |
if isinstance(objs["Contents"], list): | |
for obj in cast(list[Any], objs["Contents"]): | |
if not isinstance(obj, dict): | |
raise Exception("Not dict") | |
obj = cast(dict[str, Any], obj) | |
yield obj["Key"] | |
if "NextContinuationToken" not in objs: | |
break | |
cont_token = objs["NextContinuationToken"] | |
@with_backoff | |
@aio | |
def delete_object(key: str) -> None: | |
s3.delete_object(Bucket=BUCKET_NAME, Key=key) | |
def get_store_hashes() -> set[str]: | |
hashes = set() | |
for obj in listdir("/nix/store"): | |
hashes.add(obj.split("-")[0]) | |
return hashes | |
async def main() -> None: | |
store_hashes = get_store_hashes() | |
nars_to_delete = set() | |
nars_to_keep = set() | |
async for obj_key in list_cache_objects(): | |
if obj_key.endswith(".narinfo"): | |
# check if we have the hash locally | |
narinfo = await get_object(obj_key) | |
narinfo = NarInfo(narinfo) | |
if not await narinfo.exists_locally(): | |
print(f"Found unused NAR for {narinfo.store_path}") | |
await delete_object(obj_key) | |
nars_to_delete.add(narinfo.url) | |
else: | |
nars_to_keep.add(narinfo.url) | |
if obj_key.startswith("realisations/"): | |
realisation = await get_object(obj_key) | |
realisation = json.loads(realisation) | |
if not isinstance(realisation, dict): | |
continue | |
if "outPath" not in realisation: | |
continue | |
if not await exists_locally("/nix/store/" + | |
realisation["outPath"]): | |
print(f"Found unused realisation for {realisation['outPath']}") | |
await delete_object(obj_key) | |
if obj_key.startswith("nar/"): | |
nars_to_delete.add(obj_key) | |
for nar in nars_to_delete: | |
if nar in nars_to_keep: | |
continue | |
print(f"Deleting unused NAR {nar}") | |
await delete_object(nar) | |
if __name__ == "__main__": | |
asyncio.get_event_loop().run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment