Skip to content

Instantly share code, notes, and snippets.

@aheadley
Last active November 16, 2018 18:02
Show Gist options
  • Save aheadley/7f55102ae4aa09badbc943cb7bfa8a6b to your computer and use it in GitHub Desktop.
Save aheadley/7f55102ae4aa09badbc943cb7bfa8a6b to your computer and use it in GitHub Desktop.
A stupid script to read reddit threads out loud
#!/usr/bin/env python3
import hashlib
import tempfile
import os
import os.path
import urllib.parse
import re
import functools
import logging
import multiprocessing
import subprocess
import pprint
import time
import glob
import binascii
import signal
from xml.sax.saxutils import escape as xml_escape
import boto3
# import praw
import requests
import langid
LOG_LEVEL = logging.INFO
TMP_FILE_PREFIX = 'reddit-reader-'
PLAY_CMD = 'mpv -vo null {}'
EXIT_SENTINEL = -1
USER_AGENT = 'python:reddit-reader:1.0 (by /u/not_that_guy_either)'
VALID_AUDIO_FORMATS = ['mp3', 'ogg_vorbis', 'pcm']
VALID_SAMPLE_RATES = ['8000', '16000', '22050']
VALID_VOICE_PITCHES = ['low', 'medium', 'high']
MESSAGE_TEMPLATE = """
<speak><prosody rate="{rate}" pitch="{pitch}">{message}</prosody></speak>
""".strip()
PHONEME_TEMPLATE = '<phoneme ph="{}">{}</phoneme>'
EMPHASIS_TEMPLATE = '<emphasis level="{}">{}</emphasis>'
TOKEN_REPLACEMENT_MAP = {
r'lol': 'ph:l o l',
r'\<3': 'heart',
r'(&gt;\s*)+': 'quote ',
r'\[(.*?)\]\((.*?)\)': r'\1',
r'&amp;': r'&',
r'#x200B;': r'',
}
STARTUP_MESSAGE = 'reddit-reader v0.1'
def multiprocessify(func):
@functools.wraps(func)
def wrapper(*pargs, **kwargs):
return multiprocessing.Process(target=func, args=pargs, kwargs=kwargs)
return wrapper
def get_worker_logger(worker_name, level=LOG_LEVEL):
logging.basicConfig()
log = logging.getLogger('reddit-reader.{}-{:05d}'.format(worker_name, os.getpid()))
log.setLevel(level)
return log
def walk_replies(reply_listing):
try:
if reply_listing['kind'] != 'more':
for reply in reply_listing['data']['children']:
if reply['kind'] != 'more':
try:
if reply['data']['body'] != '[deleted]':
yield (reply['data']['author'], reply['data']['body'])
except KeyError as err:
logging.exception(err)
logging.warn(pprint.pformat(reply['data']))
continue
if reply['data']['replies'] != '':
for reply in walk_replies(reply['data']['replies']):
yield reply
except TypeError as err:
logging.exception(err)
logging.warn(pprint.pformat(reply_listing))
def walk_comments(thread):
if thread[0]['data']['children'][0]['data']['selftext']:
yield (thread[0]['data']['children'][0]['data']['author'],
thread[0]['data']['children'][0]['data']['selftext'])
for child in thread[1]['data']['children']:
if child['kind'] != 'more':
if child['data']['body'] != '[deleted]':
yield (child['data']['author'], child['data']['body'])
if child['data']['replies'] != '':
for reply in walk_replies(child['data']['replies']):
yield reply
@multiprocessify
def handle_messages(msg_queue, audio_queue, config):
log = get_worker_logger('processor', opts.log_level)
log.debug('Worker process starting')
signal.signal(signal.SIGINT, signal.SIG_IGN)
log.debug('Using temp file dir: %s', tempfile.gettempdir())
for old_fn in glob.glob(os.path.join(tempfile.gettempdir(), TMP_FILE_PREFIX + '*.*')):
log.debug('Deleteing old temp file: %s', old_fn)
os.unlink(old_fn)
polly_client = boto3.client('polly',
aws_access_key_id=config.access_key,
aws_secret_access_key=config.secret_key,
region_name='us-east-1',
)
voices = sorted(polly_client.describe_voices()['Voices'], key=lambda v: v['Id'])
voice_langs = set(v['LanguageCode'].split('-')[0] for v in voices if len(v['LanguageCode'].split('-')[0]) == 2)
log.debug('Found %d voices from language families: %s',
len(voices), ', '.join(voice_langs))
lang_classifier = langid.langid.LanguageIdentifier.from_modelstring(
langid.langid.model, norm_probs=True)
lang_classifier.set_languages(voice_langs)
# there's a large delay while langid loads the model on the first classify()
# call, so we do that now
lang_classifier.classify(STARTUP_MESSAGE)
log.debug('Ready to work')
qitem = msg_queue.get()
while qitem != EXIT_SENTINEL:
nick, orig_msg = qitem
msg = clean_message(orig_msg)
if not msg:
log.warning('Skipping garbage message: "%s"', msg)
# continue
msg = MESSAGE_TEMPLATE.format(
rate=config.speech_rate,
pitch=nick2bucket(nick, VALID_VOICE_PITCHES),
message=msg,
)
if config.force_lang:
msg_lang = config.default_lang
log.debug('Forcing message language to: %s', msg_lang)
else:
msg_lang, prob = lang_classifier.classify(orig_msg)
log.debug('Detected message language as: %s (p%0.5f)', msg_lang, prob)
if prob < config.confidence_threshold:
msg_lang = config.default_lang
log.debug('Detection confidence too low (<p%0.2f), defaulting to: %s',
config.confidence_threshold, msg_lang)
if msg_lang not in voice_langs:
msg_lang = config.default_lang
log.warning('Detected language is not an available voice, defaulting to: %s', msg_lang)
msg_voices = [v for v in voices if v['LanguageCode'].startswith(msg_lang + '-')]
voice = nick2bucket(nick, msg_voices)
log.debug('Synthesizing with "%s" [%s@%s]: "%s"',
voice['Id'], config.audio_format, config.sample_rate, msg)
try:
resp = polly_client.synthesize_speech(
Text=msg,
TextType='ssml',
VoiceId=voice['Id'],
OutputFormat=config.audio_format,
SampleRate=config.sample_rate,
)
except Exception as err:
log.warning(err)
log.exception(err)
else:
tmp_f, tmp_fn = tempfile.mkstemp(
suffix='.' + config.audio_format,
prefix=TMP_FILE_PREFIX,
)
os.close(tmp_f)
log.debug('Writing audio to: %s', tmp_fn)
with open(tmp_fn, 'wb') as tmp_f:
try:
tmp_f.write(resp['AudioStream'].read())
except Exception as err:
log.error('Failed to write audio file (%s): %s', tmp_fn, err)
log.exception(err)
raise err
else:
audio_queue.put((tmp_fn, nick, orig_msg))
log.debug('Queued audio file: %s', tmp_fn)
qitem = msg_queue.get()
log.debug('Worker process stopping')
@multiprocessify
def play_audio(audio_queue, play_cmd, opts):
log = get_worker_logger('player', opts.log_level)
log.debug('Worker process starting')
signal.signal(signal.SIGINT, signal.SIG_IGN)
with open('/dev/null', 'w+b') as dev_null:
log.debug('Ready to work')
qitem = audio_queue.get()
while qitem != EXIT_SENTINEL:
fn, nick, msg = qitem
log.debug('Playing audio file: %s', fn)
try:
log.info('/u/%s >>> %s', nick, msg)
subprocess.call(play_cmd.format(fn).split(), stdout=dev_null, stderr=dev_null)
except Exception as err:
log.error('Failed to play audio file (%s): %s', fn, err)
log.exception(err)
else:
os.unlink(fn)
log.debug('Deleted audio file: %s', fn)
time.sleep(0.3)
qitem = audio_queue.get()
log.debug('Worker process stopping')
def nick2bucket(nick, buckets):
return buckets[int(binascii.hexlify(nick.strip().lower().encode('utf-8')), 16) % len(buckets)]
def clean_message(msg):
return ' '.join(clean_token(t) for t in msg.split()).strip()
def _clean_url(token):
try:
url = urllib.parse.urlparse(token)
if url.scheme.startswith('http'):
domain_parts = url.netloc.split('.')
# if first part of the domain is 1-3 homogeneous characters, drop it
# catches things like (i.)imgur.com or (www.)example.com while skipping
# something like bbc.uk
if 3 >= len(domain_parts[0]) and 1 == len(set(domain_parts[0])):
token = '.'.join(domain_parts[1:])
else:
token = url.netloc
token = EMPHASIS_TEMPLATE.format('strong', token)
except Exception: pass
return token
def _clean_replacements(token):
for rgx, ph in TOKEN_REPLACEMENT_MAP.items():
if re.match(rgx, token, re.I):
if ph is None:
token = ''
elif ph.startswith('ph:'):
token = PHONEME_TEMPLATE.format(ph[3:], token)
else:
token = ph
return token
def _clean_duplicates(token):
# reduce characters repeated 4+ times in a row to 2 characters
token = re.sub(r'((.)\2)(\2{2,})', r'\1', token)
if not token.startswith('<phoneme'):
token = xml_escape(token.strip())
if token.startswith('*') and token.endswith('*') and len(token) >= 3:
token = EMPHASIS_TEMPLATE.format('strong', token.strip('*'))
return token
def clean_token(token):
return _clean_duplicates(_clean_url(_clean_replacements(_clean_url(token))))
def main(opts):
log = get_worker_logger('main', opts.log_level)
json_url = opts.url + '.json'
log.debug('Pulling JSON from reddit: %s', json_url)
thread = requests.get(json_url, headers={'User-Agent': USER_AGENT}).json()
audio_queue = multiprocessing.Queue(opts.queue_size)
text_queue = multiprocessing.Queue(opts.queue_size)
log.debug('Starting workers...')
proc_handle = handle_messages(text_queue, audio_queue, opts)
proc_handle.start()
player_handle = play_audio(audio_queue, PLAY_CMD, opts)
player_handle.start()
try:
for comment in walk_comments(thread):
if comment[0] in ['AutoModerator']:
continue
log.debug('Queueing comment: %s: %s', comment[0], comment[1])
text_queue.put(comment)
except KeyboardInterrupt:
log.warning('Caught ^C, exiting...')
log.debug('Sending processor exit signals')
text_queue.put(EXIT_SENTINEL)
audio_queue.put(EXIT_SENTINEL)
try:
log.debug('Waiting for text processor to exit')
proc_handle.join()
log.debug('Waiting for audio processor to exit')
player_handle.join()
except KeyboardInterrupt:
log.warning('Not waiting for graceful exit, terminating workers')
if proc_handle.is_alive():
proc_handle.terminate()
if player_handle.is_alive():
player_handle.terminate()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('url', metavar='URL')
parser.add_argument('--default-lang', default='en')
parser.add_argument('--force-lang', action='store_true')
parser.add_argument('--confidence-threshold', default=0.4, type=float)
parser.add_argument('--audio-format', default='mp3', choices=VALID_AUDIO_FORMATS)
parser.add_argument('--sample-rate', default='22050', choices=VALID_SAMPLE_RATES)
parser.add_argument('--speech-rate', default=1.1, type=float)
parser.add_argument('--access-key', default=os.environ.get('AWS_ACCESS_KEY_ID'))
parser.add_argument('--secret-key', default=os.environ.get('AWS_SECRET_ACCESS_KEY'))
parser.add_argument('--queue-size', default=8, type=int)
parser.add_argument('-q', '--quiet', action='count', default=0)
parser.add_argument('-v', '--verbose', action='count', default=0)
opts = parser.parse_args()
opts.log_level = LOG_LEVEL + (10 * -opts.verbose) + (10 * opts.quiet)
main(opts)
boto3==1.9.41
botocore==1.12.41
certifi==2018.10.15
chardet==3.0.4
docutils==0.14
idna==2.7
jmespath==0.9.3
langid==1.1.6
numpy==1.15.4
praw==6.0.0
prawcore==1.0.0
python-dateutil==2.7.5
requests==2.20.1
s3transfer==0.1.13
six==1.11.0
update-checker==0.16
urllib3==1.24.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment