Last active
October 19, 2019 10:46
-
-
Save madr/bb080c4c1a33b85c19e0a5cd7db7b2d4 to your computer and use it in GitHub Desktop.
Podcast backup script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Podcast backup script | |
Parses an RSS feed from SOURCE_FILE and download all items to | |
DESTINATION_PATH. Downloads are done in parallel, for | |
PARALLEL_COUNT downloads at the time. | |
How to use | |
---------- | |
1. Set DESTINATION_PATH. Make sure the folder exists on your | |
file system. | |
2. Save the source file (RSS or Atom) on your computer and | |
update SOURCE_FILE if needed. | |
3. Alter PARALLEL_COUNT for your needs. Higher number will | |
decrease total time for this script to be done, but will | |
increase net traffic. | |
4. Run script in a python intepreter, 3.7 is recommended. | |
This script was written by Anders Ytterström in October 2019. | |
If you find it useful, buy him a 🍺. | |
""" | |
import queue | |
import threading | |
import xml.etree.ElementTree as ET | |
from urllib.request import urlretrieve | |
DESTINATION_PATH = "D:\Asmodean\podcasts\inbox" | |
SOURCE_FILE = "D:\Kod\gists\src.xml" | |
PARALLEL_COUNT = 3 | |
def download_file(url, target): | |
print(f"Downloading {target} <- {url}") | |
urlretrieve(url, f"{DESTINATION_PATH}\{target}.mp3") | |
def get_urls(): | |
tree = ET.parse(SOURCE_FILE) | |
root = tree.getroot() | |
def f(item): | |
url = item.find("enclosure").attrib["url"] | |
filename = slugify(item.find("title").text) | |
return (url, filename) | |
return map(f, root.findall("./channel/item")) | |
def slugify(text): | |
return ( | |
text.lower() | |
.replace(" ", "-") | |
.replace(":", "") | |
.replace("/", "-av-") | |
.replace("?", "") | |
) | |
def do_work(item): | |
download_file(*item) | |
if __name__ == "__main__": | |
def worker(): | |
while True: | |
item = q.get() | |
if item is None: | |
break | |
do_work(item) | |
q.task_done() | |
q = queue.Queue() | |
threads = [] | |
for i in range(PARALLEL_COUNT): | |
t = threading.Thread(target=worker) | |
t.start() | |
threads.append(t) | |
source = get_urls() | |
for item in source: | |
q.put(item) | |
# block until all tasks are done | |
q.join() | |
# stop workers | |
for i in range(PARALLEL_COUNT): | |
q.put(None) | |
for t in threads: | |
t.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment