Skip to content

Instantly share code, notes, and snippets.

@made2591
Created November 29, 2020 16:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save made2591/7ac3a9b6e1212a52aabf925c83a1b719 to your computer and use it in GitHub Desktop.
Save made2591/7ac3a9b6e1212a52aabf925c83a1b719 to your computer and use it in GitHub Desktop.
My gist to get my blog read by AWS Polly and my markdown files updated accordingly with new meta
import boto3
import json
import re
import requests
from bs4 import BeautifulSoup
from pathlib import Path
BASE_URL = "YOUR_BASE_URL"
CONTENT_BUCKET = "YOUR_BUCKET_NAME"
NUMBER_OF_WORDS = 500
NUMBER_OF_PARAGRAPHS = 6
FINAL_SENTENCE = "... Ehy! You are surely miss something: if you want to know more, visit YOUR_BASE_URL"
BASE_PATH = "YOUR_MARKDOWN_BLOGPOSTS_FILESYSTEM_POSITION"
polly_client = boto3.client('polly')
s3_client = boto3.client('s3')
def lambda_handler(event, context):
print("Getting urls...")
# get post urls
urls = __get_blog_post_urls() # urls = ["https://madeddu.xyz/posts/go-async-await/"]
# print(urls)
print("Getting paragraphs...")
# for each post get text
all_paragraphs = { url : __get_page_content_for_nts(url) for url in urls }
# print(all_paragraphs)
print("Getting SSML tagged text...")
texts = { url : __add_SSML_Enhanced_tags(paragraphs) for url, paragraphs in all_paragraphs.items()}
# print(texts)
print("Getting articles read...")
# for each post produce mp3 and save it to s3
s3_paths = [__get_content_read_by_polly(url.replace(f"{BASE_URL}/posts/", ""), text) for url, text in texts.items()]
# for s3_path in s3_paths:
# print(s3_path)
print("Getting articles markdownsssss...")
# get file list
file_list = __get_markdown_list()
# for file_name in file_list:
# print(file_name)
print("Getting matches audio/articles...")
# match produced audio
matches = __match_audio_and_post(file_list, s3_paths)
for audio_name, file_name in matches.items():
print(audio_name, file_name)
print("Getting articles modified...")
# modify old post
__insert_new_audio_reference(matches)
print("Getting Hugo Theme changed acc...just kidding. Done!!!")
# print s3 paths
return s3_paths
def __get_blog_post_urls(url=BASE_URL):
# create request
r = requests.get(f"{url}/posts/")
index = 2
# accumulate blog post urls
urls = []
# go ahead with pages until
while r.status_code == 200:
# parse page
soup = BeautifulSoup(r.text, features="html.parser")
# find all href
for a in soup.findAll('a', href=True):
# get only post
if a['href'] != f"{BASE_URL}/posts/" and f"{BASE_URL}/posts/" in a['href'] and "/page/" not in a['href']:
# append urls
urls.append(a['href'])
# create request
try:
r = requests.get(f"{url}/posts/page/{index}")
except:
return urls
index += 1
# return result
return urls
def __get_page_content(url, number_of_words=NUMBER_OF_WORDS, final_sentence=FINAL_SENTENCE):
# create request
r = requests.get(url)
# parse page
soup = BeautifulSoup(r.text, features="html.parser")
# get all paragraph
paragraphs = soup.find("div", {"id": "main"}).findAll("p")
# accumulate outer text
page_with_no_code = ""
# for each found paragraph
for paragraph in paragraphs:
# exclude portion of code
if paragraph.findAll("div", {"class": "highlight"}):
continue
# get the text out of the paragraph
text = paragraph.text.strip()
# exclude a common header
if "Subscribe to my newsletter to be informed about my new blog posts, talks and activities." in text:
continue
# accumulate page text
page_with_no_code += text+" "
# return result
return f"{page_with_no_code[:number_of_words]}{FINAL_SENTENCE}"
def __get_page_content_for_nts(url, number_of_words=NUMBER_OF_WORDS, final_sentence=FINAL_SENTENCE):
# create request
r = requests.get(url)
# parse page
soup = BeautifulSoup(r.text, features="html.parser")
# get all paragraph
paragraphs = soup.find("div", {"id": "main"}).findAll("p")
# accumulate outer text
page_with_no_code = []
# for each found paragraph
for paragraph in paragraphs:
# exclude portion of code
if paragraph.findAll("div", {"class": "highlight"}):
continue
# get the text out of the paragraph
text = paragraph.text.strip()
# exclude a common header
if "Subscribe to my newsletter to be informed about my new blog posts, talks and activities." in text:
continue
# accumulate page text
page_with_no_code.append(paragraph)
# return result
return page_with_no_code
def __add_SSML_Enhanced_tags(paragraphs):
# tag to start a speach
text = "<speak>"
# add informal style
text = f'{text}<amazon:domain name="conversational"><amazon:effect name="drc">'
# # add breathing to sounds more natural
# text = f'{text}<amazon:auto-breaths>'
# for each paragraph
for paragraph in paragraphs[:NUMBER_OF_PARAGRAPHS]:
# prepare the paragraph with dot and comma breaks
paragraph_text = paragraph.text.strip()
# paragraph_text = paragraph_text.replace("...", "<break time=\"500ms\"/>")
# paragraph_text = paragraph_text.replace(". ", "<break time=\"800ms\"/>")
# paragraph_text = paragraph_text.replace(",", "<break time=\"300ms\"/>")
# prepare the paragraph with slang expression
paragraph_text = paragraph_text.replace("btw", "<sub alias=\"by the way\">by the way</sub>")
paragraph_text = paragraph_text.replace("PoC", f"<say-as interpret-as=\"spell-out\">PoC</say-as>")
# empthatyse em words
# ems = paragraph.findAll("em")
# for em in ems:
# paragraph_text = paragraph_text.replace(f"{em.text}", f'<emphasis level="moderate">{em.text}</emphasis>')
# # pronunce strong words loudly
# strongs = paragraph.findAll("strong")
# for strong in strongs:
# paragraph_text = paragraph_text.replace(f"{strong.text}", f'<emphasis level="moderate">{strong.text}</emphasis>')
# print(paragraph)
# print(paragraph_text)
# concat paragraph parsed to text
if len(f"{text} {paragraph_text}") > 1490-len(f" {FINAL_SENTENCE}"):
break
else:
text = f"{text} {paragraph_text}"
# close the text
#text = f"{text} {FINAL_SENTENCE}</speak>"
text = f"{text} {FINAL_SENTENCE}</amazon:effect></amazon:domain></speak>"
# close the text
return text
def __get_content_read_by_polly(article_path, content):
print(article_path)
# read content
response = polly_client.synthesize_speech(
Engine='neural',
LanguageCode='en-US',
OutputFormat='mp3',
Text = content,
TextType='ssml',
VoiceId='Matthew'
)
# save mp3
with open('speech.mp3', 'wb') as f:
f.write(response['AudioStream'].read())
# upload mp3
with open('speech.mp3', 'rb') as f:
s3_client.upload_fileobj(f, CONTENT_BUCKET, f'mp3/{article_path[:-1]}.mp3')
return f'mp3/{article_path[:-1]}.mp3'
def __get_markdown_list(base_path=BASE_PATH):
# get list of all markdown
list_of_files = list(Path(base_path).rglob("*.md"))
# return it
return list_of_files
def __match_audio_and_post(file_list, audio_list):
# match_dict
matches = {audio_path : '' for audio_path in audio_list}
# find match by name
for audio_path in audio_list:
for file_name in file_list:
if audio_path.split("/")[-1].replace(".mp3", "") == str(file_name).split("/")[-1].replace(".md", "").lower():
matches[audio_path] = str(file_name)
continue
# return matches
return matches
def __insert_new_audio_reference(matches):
# for each match
for audio_name, file_name in matches.items():
# read the content
with open(file_name, "r") as f:
lines = f.readlines()
# add the line
lines = lines[0:4]+[f'polly: {BASE_URL}/{audio_name}\n']+lines[4:]
# write the new content
with open(file_name, "w") as f:
for line in lines:
f.write(line)
if __name__ == "__main__":
lambda_handler(None, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment