Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
An example of an asyncio MirrorDownloader
import asyncio
from collections import namedtuple
import os
import socket
import time
from urllib.parse import urlparse
import aiohttp
import django
from import attach_url_to_exception, HttpDownloader
Mirror = namedtuple('Mirror', ['url', 'connect_time'])
url (str): The url corresponding with the download
connect_time (float): The amount of time it took to connect to the webserver
class MirrorDownloader(HttpDownloader):
A Mirrorlist-aware HttpDownloader downloader for downloading files.
If an exception is raised, the next "closest" mirror is tried until all mirrors are exhausted.
It provides digest and size validation along with computation of the digests needed to save the
file as an Artifact. It writes a new file to the disk and the return path is included in the
This downloader has all of the attributes of
def __init__(self, mirrors, relative_path, session, **kwargs):
Download files via `http://` or `https://`.
mirrors (list): The list of Mirror objects. It increments upwards through them so
pre-sorting them incrementally by ping time is a good idea.
relative_path (str): The relative path to the file.
session (aiohttp.ClientSession): The session to be used by the downloader.
kwargs (dict): This accepts the parameters of
self.mirrors = mirrors
self.relative_path = relative_path
self.session = session
super().__init__(session, None, **kwargs)
async def run(self):
Read, validate, and compute digests on the `url`. This is a coroutine.
This method provides the same return object type and documented in
for mirror in self.mirrors:
print('fetching from {mirror}'.format(mirror=mirror.url))
self.url = mirror.url + self.relative_path
result = await super().run()
except Exception as error:
if error.code == 404:
print('rolling over to next mirror')
return result
raise Exception('Exhausted all mirrors for url: {url}'.format(url=url))
def time_socket_connect(url):
Measure the time to connect to the url via a socket and then disconnect.
float: The number of seconds the socket took to connect
parse_report = urlparse(url)
if not parse_report.port:
port = socket.getservbyname(parse_report.scheme)
port = parse_report.port
time_before = time.time()
mysocket = socket.socket()
mysocket.connect((parse_report.hostname, port))
result = time.time() - time_before
return result
def parse_as_mirrorlist(session, url):
mirrors_with_times = []
loop = asyncio.get_event_loop()
mirrorlist_data_downloader = HttpDownloader(session, url)
done, _ = loop.run_until_complete(asyncio.wait([]))
mirrorlist_data_download_result = done.pop().result()
with open(mirrorlist_data_download_result.path) as f:
for url in f:
url = url.rstrip() # remove any newline chars
if url.startswith('#'):
continue # This is a comment in the mirrorlist
mirrors_with_times.append(Mirror(url=url, connect_time=time_socket_connect(url)))
return sorted(mirrors_with_times, key=lambda x: x.connect_time)
def sync():
# url = ''
url = ''
repomd_url = url + '/repodata/repomd.xml'
session = aiohttp.ClientSession()
repomd_downloader = HttpDownloader(session, repomd_url)
loop = asyncio.get_event_loop()
done, _ = loop.run_until_complete(asyncio.wait([]))
repomd_download_result = done.pop().result()
except Exception as error:
sorted_mirrors = parse_as_mirrorlist(session, url)
mirror_downloader = MirrorDownloader(sorted_mirrors, '/repodata/repomd.xml', session)
done, _ = loop.run_until_complete(asyncio.wait([]))
print('\n\n' + str(done.pop().result()) + '\n\n')
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment