Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
An example of an asyncio MirrorDownloader
import asyncio
from collections import namedtuple
import os
import socket
import time
from urllib.parse import urlparse
import aiohttp
import django
django.setup()
from pulpcore.plugin.download.asyncio import attach_url_to_exception, HttpDownloader
Mirror = namedtuple('Mirror', ['url', 'connect_time'])
"""
Args:
url (str): The url corresponding with the download
connect_time (float): The amount of time it took to connect to the webserver
"""
class MirrorDownloader(HttpDownloader):
"""
A Mirrorlist-aware HttpDownloader downloader for downloading files.
If an exception is raised, the next "closest" mirror is tried until all mirrors are exhausted.
It provides digest and size validation along with computation of the digests needed to save the
file as an Artifact. It writes a new file to the disk and the return path is included in the
:class:`~pulpcore.plugin.download.asyncio.DownloadResult`.
This downloader has all of the attributes of
:class:`~pulpcore.plugin.download.asyncio.HttpDownloader`
"""
def __init__(self, mirrors, relative_path, session, **kwargs):
"""
Download files via `http://` or `https://`.
Args:
mirrors (list): The list of Mirror objects. It increments upwards through them so
pre-sorting them incrementally by ping time is a good idea.
relative_path (str): The relative path to the file.
session (aiohttp.ClientSession): The session to be used by the downloader.
kwargs (dict): This accepts the parameters of
:class:`~pulpcore.plugin.download.asyncio.HttpDownloader`.
"""
self.mirrors = mirrors
self.relative_path = relative_path
self.session = session
super().__init__(session, None, **kwargs)
@attach_url_to_exception
async def run(self):
"""
Read, validate, and compute digests on the `url`. This is a coroutine.
This method provides the same return object type and documented in
:meth:`~pulpcore.plugin.download.asyncio.BaseDownloader.run`.
"""
for mirror in self.mirrors:
print('fetching from {mirror}'.format(mirror=mirror.url))
try:
self.url = mirror.url + self.relative_path
result = await super().run()
except Exception as error:
if error.code == 404:
print('rolling over to next mirror')
continue
raise
else:
return result
raise Exception('Exhausted all mirrors for url: {url}'.format(url=url))
def time_socket_connect(url):
"""
Measure the time to connect to the url via a socket and then disconnect.
Returns:
float: The number of seconds the socket took to connect
"""
parse_report = urlparse(url)
if not parse_report.port:
port = socket.getservbyname(parse_report.scheme)
else:
port = parse_report.port
time_before = time.time()
mysocket = socket.socket()
mysocket.connect((parse_report.hostname, port))
result = time.time() - time_before
mysocket.close()
return result
def parse_as_mirrorlist(session, url):
mirrors_with_times = []
loop = asyncio.get_event_loop()
mirrorlist_data_downloader = HttpDownloader(session, url)
done, _ = loop.run_until_complete(asyncio.wait([mirrorlist_data_downloader.run()]))
mirrorlist_data_download_result = done.pop().result()
with open(mirrorlist_data_download_result.path) as f:
for url in f:
url = url.rstrip() # remove any newline chars
if url.startswith('#'):
continue # This is a comment in the mirrorlist
mirrors_with_times.append(Mirror(url=url, connect_time=time_socket_connect(url)))
return sorted(mirrors_with_times, key=lambda x: x.connect_time)
def sync():
# url = 'https://mirrors.fedoraproject.org/mirrorlist?repo=fedora-25&arch=x86_64'
url = 'https://repos.fedorapeople.org/repos/pulp/pulp/fixtures/rpm-mirrorlist-mixed'
repomd_url = url + '/repodata/repomd.xml'
session = aiohttp.ClientSession()
repomd_downloader = HttpDownloader(session, repomd_url)
loop = asyncio.get_event_loop()
done, _ = loop.run_until_complete(asyncio.wait([repomd_downloader.run()]))
try:
repomd_download_result = done.pop().result()
except Exception as error:
sorted_mirrors = parse_as_mirrorlist(session, url)
mirror_downloader = MirrorDownloader(sorted_mirrors, '/repodata/repomd.xml', session)
done, _ = loop.run_until_complete(asyncio.wait([mirror_downloader.run()]))
print('\n\n' + str(done.pop().result()) + '\n\n')
if __name__ == "__main__":
sync()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.