Skip to content

Instantly share code, notes, and snippets.

@hutattedonmyarm
Last active January 13, 2020 22:35
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hutattedonmyarm/83609559921d48c4ec02f9c24929cf7a to your computer and use it in GitHub Desktop.
Save hutattedonmyarm/83609559921d48c4ec02f9c24929cf7a to your computer and use it in GitHub Desktop.
Splits audiobooks (M4B, single MP3, 1 MP3/chapter) into chapter files, but with a minimum of 90 minutes and uploads them to Overcast
import os
import subprocess
import glob
import sys
import json
import re
import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import Style
import colorama
def login_overcast():
"""
Logs into Overcast and returns the session & response
"""
fields = {
'email': OVERCAST_EMAIL,
'password': OVERCAST_PASSWORD,
'then': 'podcasts'
}
url = 'https://overcast.fm/login'
upload_url = 'https://overcast.fm/uploads'
overcast_session = requests.session()
overcast_session.post(url, data=fields)
login_response = overcast_session.get(upload_url)
return (overcast_session, login_response)
def upload_file(path, data, overcast_session, prefix):
"""
Uploads a file to overcast
"""
s3_key = prefix + os.path.basename(path)
audio_files = {'file': open(path, 'rb')}
print(f'{Fore.GREEN}Uploading {path}{Style.RESET_ALL}')
ul_response = overcast_session.post(upload_to, files=audio_files, data=data)
print(f'{Fore.GREEN}Upload response: {ul_response.status_code}{Style.RESET_ALL}')
final_response = overcast_session.post(
'https://overcast.fm/podcasts/upload_succeeded',
data={'key': s3_key})
print(f'{Fore.GREEN}Final response: {final_response.status_code}{Style.RESET_ALL}')
def calc_mp3_split(audio_files):
"""
Reads chapter data from mp3 files
and calculates where splits are needed
"""
audio_files.sort()
chapter_data = [[]]
idx = 0
dur = 0
size = 0
for audio_file in audio_files:
cmd = [
'ffprobe',
'-i',
'{}'.format(audio_file),
'-loglevel',
'quiet',
'-print_format',
'json',
'-show_format'
]
print(f'{Fore.GREEN}')
print(' '.join(cmd))
print(f'{Style.RESET_ALL}')
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate()
if err:
print('Error probing file {}: {}'.format(audio_file, err), file=sys.stderr)
j = json.loads(out)
dur += float(j['format']['duration'])
already_moved = False
if size + float(j['format']['size']) > 1000000000:
idx += 1
dur = 0
chapter_data.append([])
size = 0
already_moved = True
size += float(j['format']['size'])
chapter_data[idx].append(audio_file)
if dur > 5400 and not already_moved:
idx += 1
dur = 0
size = 0
chapter_data.append([])
print(f'{Fore.GREEN}')
print('MP3 split: ', json.dumps(chapter_data))
print(f'{Style.RESET_ALL}')
return chapter_data
def concat_mp3_files(chapter_data, directory):
"""
Concatenates the files for upload
"""
basename = os.path.basename(os.path.abspath(directory))
print(f'{Fore.BLUE}Basename: {basename}{Style.RESET_ALL}')
overcast_files = []
folderbasename = 'Overcast'
folderappendix = 0
while True:
appendix = '' if folderappendix == 0 else str(folderappendix)
foldername = folderbasename + appendix
folderpath = os.path.join(directory, foldername)
if not os.path.isdir(folderpath):
break
print(f'{Fore.BLUE}Folder: {folderpath} already exists. Trying different name{Style.RESET_ALL}')
folderappendix += 1
os.mkdir(folderpath)
for idx, output_file in enumerate(chapter_data):
file_name = os.path.join(folderpath, '{:02d} - {}.mp3'.format(idx+1, basename))
command = [
'ffmpeg',
'-i',
"concat:{}".format('|'.join(output_file)),
'-c',
'copy',
'{}'.format(file_name)
]
print(f'{Fore.GREEN}')
print(' '.join(command))
print(f'{Style.RESET_ALL}')
process = subprocess.Popen(command)
process.wait()
overcast_files.append(file_name)
print(f'{Fore.GREEN}Written {file_name}{Style.RESET_ALL}')
return overcast_files
def get_chapters_from_silence(audio_file):
"""
Guesses the chapterization by silence in the audio file
"""
chapters = []
# Detect silence with a length of >= 2sec (d=2)
command = [
'ffmpeg',
'-i',
'{}'.format(audio_file),
'-af',
'silencedetect=n=-50dB:d=2',
'-f',
'null',
'-'
]
print(f'{Fore.GREEN}')
print(' '.join(command))
print(f'{Style.RESET_ALL}')
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# ffmpeg writes to stderr, read from there
_, err = process.communicate()
# Decode output to string
err = err.decode('utf-8').splitlines()
pattern_start = re.compile(r'silence_start: (\d+(?:\.\d+)?)$') #float(index 1)
pattern_end = re.compile(r'silence_end: (\d+(?:\.\d+)?) \|.*?$')
chapter_start = 0.0
line_generator = (line for line in err if line.startswith('[silencedetect'))
for line in line_generator:
if not line.startswith('[silencedetect'):
continue
split_start = re.search(pattern_start, line)
split_end = re.search(pattern_end, line)
# Silence has started => Chapter is over
if split_start:
chapters.append({
'start_time': chapter_start,
'end_time': float(split_start[1])})
# Silence end: Chapter start
elif split_end:
chapter_start = float(split_end[1])
return chapters
def probe_file_info(audio_file):
"""
Reads chapter info and bitrate from the file
"""
command = [
'ffprobe',
'-i',
'{}'.format(audio_file),
'-loglevel',
'quiet',
'-print_format',
'json',
'-show_chapters',
'-show_format'
]
print(f'{Fore.GREEN}')
print(' '.join(command))
print(f'{Style.RESET_ALL}')
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate()
if err:
print(f'{Fore.GREEN}Error probing file: {err}{Style.RESET_ALL}', file=sys.stderr)
j = json.loads(out)
chapters = j.get('chapters', [])
if not chapters:
print(f'{Fore.GREEN}No chapters in metadata. '
f'Trying to detect them via silence{Style.RESET_ALL}')
chapters = get_chapters_from_silence(audio_file)
print(f'{Fore.GREEN}')
print('chapters: ', chapters)
print(f'{Style.RESET_ALL}')
bitrate = float(j['format']['bit_rate']) / 8
return (chapters, bitrate)
def remove_video_track_from_m4b(m4b_file):
"""
Sometimes there's an empty video track in m4b files.
Remove that and write the audio track into an m4a file
"""
audio_only_file_name = m4b_file.replace('m4b', 'm4a')
# -y: Override
# -vn: No video
command = [
'ffmpeg',
'-y',
'-i',
'{}'.format(m4b_file),
'-c',
'copy',
'-vn',
'{}'.format(audio_only_file_name)
]
print(f'{Fore.GREEN}')
print('Removing potential empty video track from m4b file')
print(' '.join(command))
print(f'{Style.RESET_ALL}')
process = subprocess.Popen(command)
process.wait()
return audio_only_file_name
def generate_splits_from_chapters(chapters, bitrate):
"""
Generates the splits from chapter data
"""
splits = []
idx = 0
dur = 0
size = 0
for chapter in chapters:
duration = float(chapter['end_time']) - float(chapter['start_time'])
chapter_bytes = duration * bitrate
dur += duration
already_moved = False
if size + chapter_bytes > 1000000000:
idx += 1
dur = duration
splits.append((float(chapter['start_time']), float(chapter['end_time'])))
size = chapter_bytes
already_moved = True
if splits:
start, end = splits[idx]
end = float(chapter['end_time'])
splits[idx] = start, end
else:
splits.append((float(chapter['start_time']), float(chapter['end_time'])))
size += chapter_bytes
if dur > 5400 and not already_moved:
idx += 1
dur = 0
size = 0
splits.append((float(chapter['end_time']), float(chapter['end_time'])))
return splits
def convert_to_mp3(audio_file, mp3_file):
"""
Converts an audio file to mp3
"""
command = [
'ffmpeg',
'-i',
'{}'.format(audio_file),
'-acodec',
'libmp3lame',
'{}'.format(mp3_file)
]
print(f'{Fore.GREEN}')
print(' '.join(command))
print(f'{Style.RESET_ALL}')
process = subprocess.Popen(command)
process.wait()
def handle_m4b(audio_file, is_mp3=False):
"""
Processes Single file audiobooks
"""
print(f'{Fore.GREEN}')
print('Checking M4B info. Is MP3?', is_mp3)
print(f'{Style.RESET_ALL}')
#convert m4b to one big mp3
_, ext = os.path.splitext(f)
mp3_file = audio_file if is_mp3 else '{}'.format(audio_file.replace(ext, '.mp3'))
#read chapter data
if not is_mp3 and ext != '.m4a':
audio_file = remove_video_track_from_m4b(audio_file)
chapters, bitrate = probe_file_info(audio_file)
splits = generate_splits_from_chapters(chapters, bitrate)
#split mp3 file
print('Splits:', splits)
if splits[-1][0] == splits[-1][1]:
splits = splits[:-1]
#mp3_file
print(f'{Fore.GREEN}Splitting single file into {len(splits)} files{Style.RESET_ALL}')
if not is_mp3:
print(f'{Fore.GREEN}Converting to MP3 file{Style.RESET_ALL}')
convert_to_mp3(audio_file, mp3_file)
else:
print(f'{Fore.GREEN}Already MP3{Style.RESET_ALL}')
idx = 1
overcast_files = []
for split in splits:
oc_file = '{}_{:02d}.mp3'.format(mp3_file.replace('.mp3', ''), idx)
command = [
'ffmpeg',
'-i',
'{}'.format(mp3_file),
'-acodec',
'copy',
'-ss',
'{}'.format(split[0]),
'-to',
'{}'.format(split[1]),
'{}'.format(oc_file)
]
print(f'{Fore.GREEN}')
print(' '.join(command))
print(f'{Style.RESET_ALL}')
process = subprocess.Popen(command)
process.wait()
idx += 1
overcast_files.append(oc_file)
return overcast_files
if len(sys.argv) < 2:
print(f'Usage: {sys.argv[1]} /path/to/folder/with/audiobook/')
sys.exit(0)
ab_dir = sys.argv[1]
glob_filter = os.path.join(ab_dir, '*.mp3')
oc_files = []
colorama.init()
if len(sys.argv) > 2 and sys.argv[2] == "-u":
oc_files = glob.glob(glob_filter)
else:
files = glob.glob(glob_filter)
print(f'{Fore.GREEN}{files}{Style.RESET_ALL}')
if not files:
glob_filter = os.path.join(ab_dir, '*.m4b')
print(f'{Fore.GREEN}{glob_filter}{Style.RESET_ALL}')
files = glob.glob(glob_filter)
print(f'{Fore.GREEN}{files}{Style.RESET_ALL}')
glob_filter = os.path.join(ab_dir, '*.m4a')
print(f'{Fore.GREEN}{glob_filter}{Style.RESET_ALL}')
files.extend(glob.glob(glob_filter))
print(f'{Fore.GREEN}{files}{Style.RESET_ALL}')
for f in files:
oc_files.extend(handle_m4b(f, False))
else:
if len(files) < 10:
oc_files.extend(handle_m4b(files[0], True))
elif len(files) < 10:
oc_files = files
else:
print(print(f'{Fore.GREEN}Splitting MP3 files{Style.RESET_ALL}'))
to_merge = calc_mp3_split(files)
oc_files = concat_mp3_files(to_merge, ab_dir)
if not oc_files:
print(f'{Fore.YELLOW}Less than 10 mp3 files and no m4b files, so nothing to do{Style.RESET_ALL}')
sys.exit()
print('Done splitting, starting upload of {} files'.format(len(oc_files)))
print(print(f'{Fore.GREEN}Logging in...{Style.RESET_ALL}'))
session, response = login_overcast()
html = response.text
soup = BeautifulSoup(html, 'html.parser')
ul_form = soup.find(id='upload_form')
ul_inputs = ul_form.find_all('input')
upload_to = ul_form.get('action')
ul_data = {}
for ul_input in ul_inputs:
ul_data[ul_input.get('name')] = ul_input.get('value')
print(f'{Fore.GREEN}Log in done{Style.RESET_ALL}')
for file in oc_files:
upload_file(file, ul_data, session, ul_form.get('data-key-prefix'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment