Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
from boto3 import client
from ftplib import FTP
from gzip import GzipFile
from os import path
class Parameters:
def __init__(self):
self.ssmClient = client('ssm')
def value(self, name):
return self.ssmClient.get_parameter(Name=name)['Parameter']['Value']
class InputBuffer:
def __init__(self):
self.offset = 0
self.data = bytearray()
def append(self, chunk):
self.data += chunk
def length(self):
return len(self.data) - self.offset
def read(self, size):
value = self.data[self.offset:self.offset+size]
self.offset = min(self.offset+size, len(self.data))
if self.offset >= 1024 * 1024:
self.data = self.data[self.offset:]
self.offset = 0
return value
class OutputBuffer:
def __init__(self):
self.data = bytearray()
def append(self, chunk):
self.data += chunk
def length(self):
return len(self.data)
def read(self, size):
value = self.data[:size]
self.data = self.data[size:]
return value
class WikipediaFetch:
def __init__(self, bucket, directory, filename):
self.part = 1
self.parts = list()
self.s3client = client('s3')
self.ftp = FTP('ftp.acc.umu.se')
self.chunksize = 128 * 1024 * 1024
self.bucket = bucket
self.directory = directory
self.filename = filename
self.key = f'data/{path.splitext(filename)[0]}'
self.input = InputBuffer()
self.output = OutputBuffer()
self.compress = GzipFile(mode='r', fileobj=self.input)
def append(self, chunk):
self.input.append(chunk)
while self.input.length() > 1024 * 1024:
self.output.append(self.compress.read(128 * 1024))
while self.output.length() >= self.chunksize:
self.upload()
def upload(self):
chunk = self.output.read(self.chunksize)
response = self.s3client.upload_part(Bucket=self.bucket, Key=self.key, UploadId=self.uploadId, PartNumber=self.part, Body=chunk)
self.parts.append({'ETag': response['ETag'], 'PartNumber': self.part})
self.ftp.putcmd('NOOP')
self.part = self.part + 1
def download(self):
self.ftp.login()
self.ftp.cwd(self.directory)
self.uploadId = self.s3client.create_multipart_upload(Bucket=self.bucket, Key=self.key)['UploadId']
self.ftp.retrbinary(f'RETR {filename}', self.append, blocksize=128*1024)
while self.input.length():
self.output.append(self.compress.read(128 * 1024))
while self.output.length() > 0:
self.upload()
self.s3client.complete_multipart_upload(Bucket=self.bucket, Key=self.key, UploadId=self.uploadId, MultipartUpload={'Parts': self.parts})
parameters = Parameters()
bucket = parameters.value('/wikipedia/bucket_name')
for i in range(1, 28):
filename = f'enwiki-20201020-pages-logging{i}.xml.gz'
directory = 'mirror/wikimedia.org/dumps/enwiki/20201020/'
fetch = WikipediaFetch(bucket, directory, filename)
fetch.download()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment