Created
September 19, 2017 21:50
-
-
Save bmbouter/bbacae99d3edfb145db1498e34fa6187 to your computer and use it in GitHub Desktop.
An example of an asyncio MirrorDownloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from collections import namedtuple | |
import os | |
import socket | |
import time | |
from urllib.parse import urlparse | |
import aiohttp | |
import django | |
django.setup() | |
from pulpcore.plugin.download.asyncio import attach_url_to_exception, HttpDownloader | |
Mirror = namedtuple('Mirror', ['url', 'connect_time']) | |
""" | |
Args: | |
url (str): The url corresponding with the download | |
connect_time (float): The amount of time it took to connect to the webserver | |
""" | |
class MirrorDownloader(HttpDownloader): | |
""" | |
A Mirrorlist-aware HttpDownloader downloader for downloading files. | |
If an exception is raised, the next "closest" mirror is tried until all mirrors are exhausted. | |
It provides digest and size validation along with computation of the digests needed to save the | |
file as an Artifact. It writes a new file to the disk and the return path is included in the | |
:class:`~pulpcore.plugin.download.asyncio.DownloadResult`. | |
This downloader has all of the attributes of | |
:class:`~pulpcore.plugin.download.asyncio.HttpDownloader` | |
""" | |
def __init__(self, mirrors, relative_path, session, **kwargs): | |
""" | |
Download files via `http://` or `https://`. | |
Args: | |
mirrors (list): The list of Mirror objects. It increments upwards through them so | |
pre-sorting them incrementally by ping time is a good idea. | |
relative_path (str): The relative path to the file. | |
session (aiohttp.ClientSession): The session to be used by the downloader. | |
kwargs (dict): This accepts the parameters of | |
:class:`~pulpcore.plugin.download.asyncio.HttpDownloader`. | |
""" | |
self.mirrors = mirrors | |
self.relative_path = relative_path | |
self.session = session | |
super().__init__(session, None, **kwargs) | |
@attach_url_to_exception | |
async def run(self): | |
""" | |
Read, validate, and compute digests on the `url`. This is a coroutine. | |
This method provides the same return object type and documented in | |
:meth:`~pulpcore.plugin.download.asyncio.BaseDownloader.run`. | |
""" | |
for mirror in self.mirrors: | |
print('fetching from {mirror}'.format(mirror=mirror.url)) | |
try: | |
self.url = mirror.url + self.relative_path | |
result = await super().run() | |
except Exception as error: | |
if error.code == 404: | |
print('rolling over to next mirror') | |
continue | |
raise | |
else: | |
return result | |
raise Exception('Exhausted all mirrors for url: {url}'.format(url=url)) | |
def time_socket_connect(url): | |
""" | |
Measure the time to connect to the url via a socket and then disconnect. | |
Returns: | |
float: The number of seconds the socket took to connect | |
""" | |
parse_report = urlparse(url) | |
if not parse_report.port: | |
port = socket.getservbyname(parse_report.scheme) | |
else: | |
port = parse_report.port | |
time_before = time.time() | |
mysocket = socket.socket() | |
mysocket.connect((parse_report.hostname, port)) | |
result = time.time() - time_before | |
mysocket.close() | |
return result | |
def parse_as_mirrorlist(session, url): | |
mirrors_with_times = [] | |
loop = asyncio.get_event_loop() | |
mirrorlist_data_downloader = HttpDownloader(session, url) | |
done, _ = loop.run_until_complete(asyncio.wait([mirrorlist_data_downloader.run()])) | |
mirrorlist_data_download_result = done.pop().result() | |
with open(mirrorlist_data_download_result.path) as f: | |
for url in f: | |
url = url.rstrip() # remove any newline chars | |
if url.startswith('#'): | |
continue # This is a comment in the mirrorlist | |
mirrors_with_times.append(Mirror(url=url, connect_time=time_socket_connect(url))) | |
return sorted(mirrors_with_times, key=lambda x: x.connect_time) | |
def sync(): | |
# url = 'https://mirrors.fedoraproject.org/mirrorlist?repo=fedora-25&arch=x86_64' | |
url = 'https://repos.fedorapeople.org/repos/pulp/pulp/fixtures/rpm-mirrorlist-mixed' | |
repomd_url = url + '/repodata/repomd.xml' | |
session = aiohttp.ClientSession() | |
repomd_downloader = HttpDownloader(session, repomd_url) | |
loop = asyncio.get_event_loop() | |
done, _ = loop.run_until_complete(asyncio.wait([repomd_downloader.run()])) | |
try: | |
repomd_download_result = done.pop().result() | |
except Exception as error: | |
sorted_mirrors = parse_as_mirrorlist(session, url) | |
mirror_downloader = MirrorDownloader(sorted_mirrors, '/repodata/repomd.xml', session) | |
done, _ = loop.run_until_complete(asyncio.wait([mirror_downloader.run()])) | |
print('\n\n' + str(done.pop().result()) + '\n\n') | |
if __name__ == "__main__": | |
sync() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment