Skip to content

Instantly share code, notes, and snippets.

@moyix
Last active July 20, 2022 17:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moyix/9ec3567cf342bf00be1606282b2742ea to your computer and use it in GitHub Desktop.
Save moyix/9ec3567cf342bf00be1606282b2742ea to your computer and use it in GitHub Desktop.
Retrieve reflist for a repository on Github using httpx and asyncio
#!/usr/bin/env python
import os
import itertools
import asyncio
import sys
import httpx
import time
import random
from binascii import crc32
from collections import defaultdict
def iter_chunks(iterable, n):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, n))
if not chunk:
return
yield chunk
def packline(b):
return f'{len(b)+4:04x}'.encode() + b
def unpackline(b):
l = int(b[:4],16)
return b[4:l], b[l:]
def unpacklines(b):
parts = []
while b:
part, b = unpackline(b)
parts.append(part)
if not part: break
return parts
PACKET_END = b'0000'
PACKET_SEP = b'0001'
GIT_UA = b'git/2.35.1'
lsrefs_payload = b''.join([
packline(b'\0'.join([
b'command=ls-refs',
b'agent=' + GIT_UA,
b'object-format=sha1']) + b'\0'
),
PACKET_SEP,
packline(b'\0'.join([
b'peel', b'symrefs', b'unborn']) + b'\0'
),
PACKET_END
])
headers = {
'User-Agent': GIT_UA,
'Git-Protocol': 'version=2',
'Content-Type': 'application/x-git-upload-pack-request'
}
MAX_RETRIES = 4
permfail = {}
# name => number of tries so far
tempfail = defaultdict(int)
async def get_remote_refs(client, name):
url = f'https://github.com/{name}/git-upload-pack'
try:
r = await client.post(url, data=lsrefs_payload, headers=headers)
if r.status_code != 200:
permfail[name] = ('HTTPError', r.status_code)
return name, b''
if name in tempfail:
# Yay, we got it back!
del tempfail[name]
return name, r.content
except Exception as e:
tempfail[name] += 1
if tempfail[name] >= MAX_RETRIES:
permfail[name] = ('Exception', e)
del tempfail[name]
return name, b''
CHUNK_SIZE = 700
def getpath(name):
h = crc32(name.encode())
b1 = h & 0xff
b2 = (h >> 8) & 0xff
return f'output/{b1:02x}/{b2:02x}/{name.replace("/",",")}.pack'
async def main():
i = 0
allstart = time.time()
permfail_count = 0
#limits = httpx.Limits(max_connections=128)
async with httpx.AsyncClient(http2=True) as client:
for chunk in iter_chunks(open(sys.argv[1]), CHUNK_SIZE):
start = time.time()
tasks = []
for line in chunk:
name = line.strip()
tasks.append(asyncio.create_task(get_remote_refs(client, name)))
# Migrate tempfails to permfails
for name, tries in list(tempfail.items()):
if tries >= MAX_RETRIES:
permfail[name] = ('MaxRetries', tries)
del tempfail[name]
else:
# Add it back in to the queue for another attempt
tasks.append(asyncio.create_task(get_remote_refs(client, name)))
result = await asyncio.gather(*tasks)
for name, data in result:
if not data: continue
i += 1
with open(getpath(name), 'wb') as f:
f.write(b''.join(unpacklines(data)))
end = time.time()
print(f'[{jobslot}] [{shard}] Finished {i} in {end-allstart:.2f}s, '
f'avg speed {i/(end-allstart):.2f} repos/s, '
f'current chunk {CHUNK_SIZE/(end-start):.2f} repos/s'
f' {len(tempfail)} pending retries, {len(permfail)-permfail_count} newly failed',
file=sys.stderr)
rtk = list(tempfail.keys())
if rtk:
rtfail = random.choice(rtk)
rtfail_count = tempfail[rtfail]
print(f'[{jobslot}] [{shard}] Random tempfail: {rtfail} ({rtfail_count} tries)')
permfail_count = len(permfail)
open(f'stamps/refstamp.{jobslot}.txt', 'w').write(str(i+len(permfail))+'\n')
# At the end if there are any tempfails left, try to get them one by one
for name in tempfail:
time.sleep(.01)
name, data = await get_remote_refs(client, name)
if data:
with open(getpath(name), 'wb') as f:
f.write(b''.join(unpacklines(data)))
else:
permfail[name] = ('MaxRetries', tempfail[name])
# Set up output dirs. Will use crc32(name) to limit the number
# of files in each dir.
for i in range(256):
for j in range(256):
os.makedirs(f'output/{i:02x}/{j:02x}', exist_ok=True)
os.makedirs('stamps', exist_ok=True)
shard = os.path.basename(sys.argv[1])
jobslot = f'{int(sys.argv[2]):02d}'
asyncio.run(main())
print(file=sys.stderr)
print(f'[{jobslot}] [{shard}] Total failed: {len(permfail)}', file=sys.stderr)
with open(f'failed.{jobslot}.txt', 'a') as f:
for name, reason in permfail.items():
print(f'[{jobslot}] {name}: {reason}', file=f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment