Last active
July 20, 2022 17:42
-
-
Save moyix/9ec3567cf342bf00be1606282b2742ea to your computer and use it in GitHub Desktop.
Retrieve reflist for a repository on Github using httpx and asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import itertools | |
import asyncio | |
import sys | |
import httpx | |
import time | |
import random | |
from binascii import crc32 | |
from collections import defaultdict | |
def iter_chunks(iterable, n): | |
it = iter(iterable) | |
while True: | |
chunk = tuple(itertools.islice(it, n)) | |
if not chunk: | |
return | |
yield chunk | |
def packline(b): | |
return f'{len(b)+4:04x}'.encode() + b | |
def unpackline(b): | |
l = int(b[:4],16) | |
return b[4:l], b[l:] | |
def unpacklines(b): | |
parts = [] | |
while b: | |
part, b = unpackline(b) | |
parts.append(part) | |
if not part: break | |
return parts | |
PACKET_END = b'0000' | |
PACKET_SEP = b'0001' | |
GIT_UA = b'git/2.35.1' | |
lsrefs_payload = b''.join([ | |
packline(b'\0'.join([ | |
b'command=ls-refs', | |
b'agent=' + GIT_UA, | |
b'object-format=sha1']) + b'\0' | |
), | |
PACKET_SEP, | |
packline(b'\0'.join([ | |
b'peel', b'symrefs', b'unborn']) + b'\0' | |
), | |
PACKET_END | |
]) | |
headers = { | |
'User-Agent': GIT_UA, | |
'Git-Protocol': 'version=2', | |
'Content-Type': 'application/x-git-upload-pack-request' | |
} | |
MAX_RETRIES = 4 | |
permfail = {} | |
# name => number of tries so far | |
tempfail = defaultdict(int) | |
async def get_remote_refs(client, name): | |
url = f'https://github.com/{name}/git-upload-pack' | |
try: | |
r = await client.post(url, data=lsrefs_payload, headers=headers) | |
if r.status_code != 200: | |
permfail[name] = ('HTTPError', r.status_code) | |
return name, b'' | |
if name in tempfail: | |
# Yay, we got it back! | |
del tempfail[name] | |
return name, r.content | |
except Exception as e: | |
tempfail[name] += 1 | |
if tempfail[name] >= MAX_RETRIES: | |
permfail[name] = ('Exception', e) | |
del tempfail[name] | |
return name, b'' | |
CHUNK_SIZE = 700 | |
def getpath(name): | |
h = crc32(name.encode()) | |
b1 = h & 0xff | |
b2 = (h >> 8) & 0xff | |
return f'output/{b1:02x}/{b2:02x}/{name.replace("/",",")}.pack' | |
async def main(): | |
i = 0 | |
allstart = time.time() | |
permfail_count = 0 | |
#limits = httpx.Limits(max_connections=128) | |
async with httpx.AsyncClient(http2=True) as client: | |
for chunk in iter_chunks(open(sys.argv[1]), CHUNK_SIZE): | |
start = time.time() | |
tasks = [] | |
for line in chunk: | |
name = line.strip() | |
tasks.append(asyncio.create_task(get_remote_refs(client, name))) | |
# Migrate tempfails to permfails | |
for name, tries in list(tempfail.items()): | |
if tries >= MAX_RETRIES: | |
permfail[name] = ('MaxRetries', tries) | |
del tempfail[name] | |
else: | |
# Add it back in to the queue for another attempt | |
tasks.append(asyncio.create_task(get_remote_refs(client, name))) | |
result = await asyncio.gather(*tasks) | |
for name, data in result: | |
if not data: continue | |
i += 1 | |
with open(getpath(name), 'wb') as f: | |
f.write(b''.join(unpacklines(data))) | |
end = time.time() | |
print(f'[{jobslot}] [{shard}] Finished {i} in {end-allstart:.2f}s, ' | |
f'avg speed {i/(end-allstart):.2f} repos/s, ' | |
f'current chunk {CHUNK_SIZE/(end-start):.2f} repos/s' | |
f' {len(tempfail)} pending retries, {len(permfail)-permfail_count} newly failed', | |
file=sys.stderr) | |
rtk = list(tempfail.keys()) | |
if rtk: | |
rtfail = random.choice(rtk) | |
rtfail_count = tempfail[rtfail] | |
print(f'[{jobslot}] [{shard}] Random tempfail: {rtfail} ({rtfail_count} tries)') | |
permfail_count = len(permfail) | |
open(f'stamps/refstamp.{jobslot}.txt', 'w').write(str(i+len(permfail))+'\n') | |
# At the end if there are any tempfails left, try to get them one by one | |
for name in tempfail: | |
time.sleep(.01) | |
name, data = await get_remote_refs(client, name) | |
if data: | |
with open(getpath(name), 'wb') as f: | |
f.write(b''.join(unpacklines(data))) | |
else: | |
permfail[name] = ('MaxRetries', tempfail[name]) | |
# Set up output dirs. Will use crc32(name) to limit the number | |
# of files in each dir. | |
for i in range(256): | |
for j in range(256): | |
os.makedirs(f'output/{i:02x}/{j:02x}', exist_ok=True) | |
os.makedirs('stamps', exist_ok=True) | |
shard = os.path.basename(sys.argv[1]) | |
jobslot = f'{int(sys.argv[2]):02d}' | |
asyncio.run(main()) | |
print(file=sys.stderr) | |
print(f'[{jobslot}] [{shard}] Total failed: {len(permfail)}', file=sys.stderr) | |
with open(f'failed.{jobslot}.txt', 'a') as f: | |
for name, reason in permfail.items(): | |
print(f'[{jobslot}] {name}: {reason}', file=f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment