Skip to content

Instantly share code, notes, and snippets.

@javrasya
Last active March 4, 2023 14:33
Show Gist options
  • Save javrasya/34f3567aff0f54ba295cd812e9348bb9 to your computer and use it in GitHub Desktop.
Save javrasya/34f3567aff0f54ba295cd812e9348bb9 to your computer and use it in GitHub Desktop.
This is downloading Wikipedia page views data set and uploading it to S3 as it downloads concurrently. Concurrency can be configured as well with semaphores.
import asyncio
import zlib
from typing import List, Tuple
from aiobotocore.session import AioSession
from aiohttp_retry import ExponentialRetry, RetryClient
from tqdm import tqdm
# ##### PARAMETERIZED PART #######
YEAR = 2015
MONTHS = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}
S3_BUCKET = "<my-s3-bucket>"
S3_FOLDER = "benchmark/wikipedia_pageviews/"
AWS_PROFILE = "<my-aws>"
SCAN_FILE_NAMES_CONCURRENCY = 3
PROCESSOR_CONCURRENCY = 5
###################################
TEN_MB = 10485760
async def upload_chunk(client, chunk, upload_id, chunk_number, key):
part_number = chunk_number + 1
resp = await client.upload_part(
Bucket=S3_BUCKET,
Body=chunk,
UploadId=upload_id,
PartNumber=part_number,
Key=key
)
return {
'PartNumber': part_number,
'ETag': resp['ETag']
}
async def load_existing_files(s3_client, key):
try:
response = await s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_FOLDER)
for obj in response['Contents']:
if key == obj['Key']:
return True
return False # no keys match
except KeyError:
return False # no keys found
except Exception as e:
# Handle or log other exceptions such as bucket doesn't exist
return e
class FileProcessor(object):
def __init__(self, concurrency: int) -> None:
self.semaphore = asyncio.Semaphore(concurrency)
self.download_semaphore = asyncio.Semaphore(concurrency * 3)
self.concurrency = concurrency
self.positions = {i + 1: True for i in range(concurrency)}
self.existing_files = set()
async def load_existing_files(self, s3_client):
response = await s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_FOLDER)
for obj in response['Contents']:
self.existing_files.add(obj['Key'])
def _get_available_position(self):
position = next(iter([position for position, available in self.positions.items() if available]), None)
self.positions[position] = False
return position
def _release_position(self, position: int):
self.positions[position] = True
async def download_and_upload_file(self, session: RetryClient, s3_client, month: int, file: str) -> Tuple[int, str, bool, str]:
key = f"{S3_FOLDER}{YEAR}/{month:02}/{file.split('.')[0]}"
if key not in self.existing_files:
async with self.download_semaphore:
position = self._get_available_position()
create_multipart_upload_resp = await s3_client.create_multipart_upload(
Bucket=S3_BUCKET,
Key=key,
)
upload_id = create_multipart_upload_resp['UploadId']
part_info = {
'Parts': []
}
try:
async with session.get(f"https://dumps.wikimedia.org/other/pagecounts-raw/{YEAR}/{YEAR}-{month:02}/{file}") as response:
if response.status != 200:
return month, file, False, f"Failed to download: Status Code: {response.status}"
else:
async with self.semaphore:
size = int(response.headers.get('content-length', 0)) or None
progressbar = tqdm(
desc=f"Download, extract and then upload {file} to S3", total=size, position=position, leave=False,
)
remaining = size
index = 0
accumulator: bytes = bytes()
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
async for chunk in response.content.iter_chunked(3000):
remaining -= len(chunk)
accumulator += chunk
if len(accumulator) > TEN_MB and (remaining == 0 or remaining > TEN_MB):
decompressed = d.decompress(accumulator)
info = await upload_chunk(s3_client, decompressed, upload_id, index, key)
part_info['Parts'].append(info)
accumulator = bytes()
index += 1
progressbar.update(len(chunk))
d.flush()
if index == 0:
return month, file, False, f"Nothing to download. Content: {await response.text()}"
else:
list_parts_resp = await s3_client.list_parts(
Bucket=S3_BUCKET,
Key=key,
UploadId=upload_id
)
if len(list_parts_resp['Parts']) == index:
await s3_client.complete_multipart_upload(
Bucket=S3_BUCKET,
Key=key,
UploadId=upload_id,
MultipartUpload=part_info
)
return month, file, True, ""
else:
await s3_client.abort_multipart_upload(
Bucket=S3_BUCKET,
Key=key,
UploadId=upload_id
)
return month, file, False, f"Failed to upload S3"
finally:
self._release_position(position)
async def find_files(semaphore: asyncio.Semaphore, session: RetryClient, month: int) -> List[Tuple[int, str]]:
async with semaphore:
async with session.get(f"https://dumps.wikimedia.org/other/pagecounts-raw/{YEAR}/{YEAR}-{month:02}/md5sums.txt") as response:
payload = await response.text("utf-8")
lines = payload.split("\n")
parts = [line.split(" ") for line in lines]
files = [(month, p[1]) for p in parts if len(p) == 2]
return files
async def main():
scan_hash_limit = asyncio.Semaphore(SCAN_FILE_NAMES_CONCURRENCY)
retry_options = ExponentialRetry(attempts=10, start_timeout=2)
async with RetryClient(raise_for_status=False, retry_options=retry_options) as session:
results = await asyncio.gather(*[find_files(scan_hash_limit, session, month) for i, month in enumerate(MONTHS)])
results_flat = [(month, file) for files in results for month, file in files]
boto_session = AioSession(profile=AWS_PROFILE)
async with boto_session.create_client('s3', region_name='eu-west-1') as s3_client:
processor = FileProcessor(concurrency=PROCESSOR_CONCURRENCY)
await processor.load_existing_files(s3_client)
results = await asyncio.gather(*[processor.download_and_upload_file(session, s3_client, month, file) for i, (month, file) in enumerate(results_flat)])
unprocessed_files = [f"{month} {file} {error_message}" for month, file, successful, error_message in results if not successful]
print(f"Here is list of unprocessed files ({len(unprocessed_files)}/{len(results)})")
print("\n".join(unprocessed_files))
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment