Skip to content

Instantly share code, notes, and snippets.

@madr
Last active October 19, 2019 10:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save madr/bb080c4c1a33b85c19e0a5cd7db7b2d4 to your computer and use it in GitHub Desktop.
Save madr/bb080c4c1a33b85c19e0a5cd7db7b2d4 to your computer and use it in GitHub Desktop.
Podcast backup script
"""
Podcast backup script
Parses an RSS feed from SOURCE_FILE and download all items to
DESTINATION_PATH. Downloads are done in parallel, for
PARALLEL_COUNT downloads at the time.
How to use
----------
1. Set DESTINATION_PATH. Make sure the folder exists on your
file system.
2. Save the source file (RSS or Atom) on your computer and
update SOURCE_FILE if needed.
3. Alter PARALLEL_COUNT for your needs. Higher number will
decrease total time for this script to be done, but will
increase net traffic.
4. Run script in a python intepreter, 3.7 is recommended.
This script was written by Anders Ytterström in October 2019.
If you find it useful, buy him a 🍺.
"""
import queue
import threading
import xml.etree.ElementTree as ET
from urllib.request import urlretrieve
DESTINATION_PATH = "D:\Asmodean\podcasts\inbox"
SOURCE_FILE = "D:\Kod\gists\src.xml"
PARALLEL_COUNT = 3
def download_file(url, target):
print(f"Downloading {target} <- {url}")
urlretrieve(url, f"{DESTINATION_PATH}\{target}.mp3")
def get_urls():
tree = ET.parse(SOURCE_FILE)
root = tree.getroot()
def f(item):
url = item.find("enclosure").attrib["url"]
filename = slugify(item.find("title").text)
return (url, filename)
return map(f, root.findall("./channel/item"))
def slugify(text):
return (
text.lower()
.replace(" ", "-")
.replace(":", "")
.replace("/", "-av-")
.replace("?", "")
)
def do_work(item):
download_file(*item)
if __name__ == "__main__":
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(PARALLEL_COUNT):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
source = get_urls()
for item in source:
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(PARALLEL_COUNT):
q.put(None)
for t in threads:
t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment