Skip to content

Instantly share code, notes, and snippets.

@amacal
Created November 12, 2020 10:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amacal/3caf6e845a68d752e48b412d15a9979d to your computer and use it in GitHub Desktop.
Save amacal/3caf6e845a68d752e48b412d15a9979d to your computer and use it in GitHub Desktop.
from boto3 import client
from ftplib import FTP
from gzip import GzipFile
from os import path
class Parameters:
def __init__(self):
self.ssmClient = client('ssm')
def value(self, name):
return self.ssmClient.get_parameter(Name=name)['Parameter']['Value']
class InputBuffer:
def __init__(self):
self.offset = 0
self.data = bytearray()
def append(self, chunk):
self.data += chunk
def length(self):
return len(self.data) - self.offset
def read(self, size):
value = self.data[self.offset:self.offset+size]
self.offset = min(self.offset+size, len(self.data))
if self.offset >= 1024 * 1024:
self.data = self.data[self.offset:]
self.offset = 0
return value
class OutputBuffer:
def __init__(self):
self.data = bytearray()
def append(self, chunk):
self.data += chunk
def length(self):
return len(self.data)
def read(self, size):
value = self.data[:size]
self.data = self.data[size:]
return value
class WikipediaFetch:
def __init__(self, bucket, directory, filename):
self.part = 1
self.parts = list()
self.s3client = client('s3')
self.ftp = FTP('ftp.acc.umu.se')
self.chunksize = 128 * 1024 * 1024
self.bucket = bucket
self.directory = directory
self.filename = filename
self.key = f'data/{path.splitext(filename)[0]}'
self.input = InputBuffer()
self.output = OutputBuffer()
self.compress = GzipFile(mode='r', fileobj=self.input)
def append(self, chunk):
self.input.append(chunk)
while self.input.length() > 1024 * 1024:
self.output.append(self.compress.read(128 * 1024))
while self.output.length() >= self.chunksize:
self.upload()
def upload(self):
chunk = self.output.read(self.chunksize)
response = self.s3client.upload_part(Bucket=self.bucket, Key=self.key, UploadId=self.uploadId, PartNumber=self.part, Body=chunk)
self.parts.append({'ETag': response['ETag'], 'PartNumber': self.part})
self.ftp.putcmd('NOOP')
self.part = self.part + 1
def download(self):
self.ftp.login()
self.ftp.cwd(self.directory)
self.uploadId = self.s3client.create_multipart_upload(Bucket=self.bucket, Key=self.key)['UploadId']
self.ftp.retrbinary(f'RETR {filename}', self.append, blocksize=128*1024)
while self.input.length():
self.output.append(self.compress.read(128 * 1024))
while self.output.length() > 0:
self.upload()
self.s3client.complete_multipart_upload(Bucket=self.bucket, Key=self.key, UploadId=self.uploadId, MultipartUpload={'Parts': self.parts})
parameters = Parameters()
bucket = parameters.value('/wikipedia/bucket_name')
for i in range(1, 28):
filename = f'enwiki-20201020-pages-logging{i}.xml.gz'
directory = 'mirror/wikimedia.org/dumps/enwiki/20201020/'
fetch = WikipediaFetch(bucket, directory, filename)
fetch.download()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment