Grab blog posts written in Notion as markdown
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Based off of https://github.com/ArnaudValensi/ArnaudValensi.github.io/blob/master/bin/get-blog-posts.py | |
# See https://erc.sh/blog/notion-cms for details | |
from notion.client import NotionClient | |
from PIL import Image, ExifTags | |
import os | |
import pytz | |
import requests | |
import hashlib | |
import shutil | |
import sys | |
COMMENT_START = '/IGNORE_START' | |
COMMENT_END = '/IGNORE_END' | |
NOTION_TOKEN = os.getenv('NOTION_TOKEN') | |
NOTION_BLOG_COLLECTION_URL=os.getenv('NOTION_BLOG_COLLECTION_URL') | |
# TODO - add documentation on how to fetch NOTION_TOKEN and NOTION_BLOG_COLLECTION_URL | |
if NOTION_TOKEN is None: | |
sys.exit("The NOTION_TOKEN is missing.") | |
if NOTION_BLOG_COLLECTION_URL is None: | |
sys.exit("The NOTION_BLOG_COLLECTION_URL is missing.") | |
client = NotionClient(token_v2=NOTION_TOKEN) | |
download_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), 'content', 'blog')) | |
# Reorient the given image to a normal (non rotated) orientation. | |
# This prevent a bug in sharp/remark-images where images not in the standard orientation are | |
# styled with the wrong aspect ratio. This is because it doesn't consider the orientation field | |
# and flips the width and height. | |
def reorient(filepath): | |
try: | |
image=Image.open(filepath) | |
for orientation in ExifTags.TAGS.keys(): | |
if ExifTags.TAGS[orientation]=='Orientation': | |
break | |
exif = image._getexif() | |
if exif[orientation] == 3: | |
image=image.rotate(180, expand=True) | |
elif exif[orientation] == 6: | |
image=image.rotate(270, expand=True) | |
elif exif[orientation] == 8: | |
image=image.rotate(90, expand=True) | |
image.save(filepath) | |
image.close() | |
except (TypeError): | |
# Image doesn't have getexif | |
pass | |
def download_image(file_url, destination_folder): | |
r = requests.get(file_url, stream=True) | |
# converts response headers mime type to an extension (may not work with everything) | |
ext = r.headers['content-type'].split('/')[-1] | |
tmp_file_name = f'tmp.{ext}' | |
tmp_file_path = os.path.join(destination_folder, tmp_file_name) | |
print(f"-> Downloading {file_url}") | |
h = hashlib.sha1() | |
# open the file to write as binary - replace 'wb' with 'w' for text files | |
with open(tmp_file_path, 'wb') as f: | |
# iterate on stream using 1KB packets | |
for chunk in r.iter_content(1024): | |
f.write(chunk) # write the file | |
h.update(chunk) | |
final_file_name = f'{h.hexdigest()}.{ext}' | |
final_file_path = os.path.join(destination_folder, final_file_name) | |
os.rename(tmp_file_path, final_file_path) | |
reorient(final_file_path) | |
return final_file_name | |
def is_comment_start(block): | |
return hasattr(block, 'title') and block.title.strip() == COMMENT_START | |
def is_comment_end(block): | |
return hasattr(block, 'title') and block.title.strip() == COMMENT_END | |
def markdownify_notion_block(block, text_prefix=''): | |
text = '' | |
was_bulleted_list = False | |
numbered_list_index = None | |
within_comment = False | |
for content in block.children: | |
if is_comment_start(content): | |
within_comment = True | |
elif is_comment_end(content): | |
within_comment = False | |
if within_comment: | |
continue | |
# Close the bulleted list. | |
if was_bulleted_list and content.type != 'bulleted_list': | |
text = text + '\n' | |
was_bulleted_list = False | |
if numbered_list_index and content.type != 'numbered_list': | |
numbered_list_index = None | |
if content.type == 'header': | |
text = text + f'# {content.title}\n\n' | |
elif content.type == 'sub_header': | |
text = text + f'## {content.title}\n\n' | |
elif content.type == 'sub_sub_header': | |
text = text + f'### {content.title}\n\n' | |
elif content.type == 'code': | |
text = text + f'```{content.language}\n{content.title}\n```\n\n' | |
elif content.type == 'image': | |
image_name = download_image(content.source, download_dir) | |
text = text + text_prefix + f'<img src="{image_name}" alt="{image_name}" width="{content.width}px"/>\n\n' | |
elif content.type == 'bulleted_list': | |
text = text + text_prefix + f'* {content.title}\n' | |
was_bulleted_list = True | |
elif content.type == 'numbered_list': | |
if numbered_list_index is None: | |
numbered_list_index = 1 | |
else: | |
numbered_list_index += 1 | |
text = text + f'{numbered_list_index}. {content.title}\n' | |
elif content.type == 'divider': | |
text = text + f'---\n' | |
elif content.type == 'text': | |
if content.title.strip(): | |
text = text + text_prefix + f'{content.title} \n\n' | |
else: | |
text += '<br>\n\n' | |
elif content.type == 'video': | |
text = text + f'`video: {content.source}`\n\n' | |
else: | |
print("Unsupported type: " + content.type) | |
if len(content.children) and content.type != 'page': | |
child_text = markdownify_notion_block(content, ' ') | |
text = text + child_text | |
return text | |
def grab_blog_posts(): | |
posts = {} | |
cv = client.get_collection_view(NOTION_BLOG_COLLECTION_URL) | |
for row in cv.collection.get_rows(): | |
if not row.published: | |
continue | |
if not row.name: | |
raise ValueError(f"Title missing on publishable row") | |
title = row.name | |
if not row.publish_date: | |
raise ValueError(f'"{title}" missing publish date') | |
if not row.slug: | |
raise ValueError(f'"{title}" missing slug') | |
page = client.get_block(row.id) | |
text = markdownify_notion_block(page) | |
publish_date = row.publish_date.start\ | |
.replace(tzinfo=pytz.timezone(row.publish_date.timezone))\ | |
.replace(tzinfo=pytz.timezone('America/New_York')) | |
metas = [f"title: '{title}'", f"slug: {row.slug}", f"date: {publish_date}"] | |
metas = [m.strip() for m in metas] | |
metaText = '---\n' + '\n'.join(metas) + '\n---\n' | |
posts[row.slug] = metaText + text | |
return posts | |
if __name__ == "__main__": | |
print(f'-> Cleaning the download directory ("{download_dir}")') | |
try: | |
shutil.rmtree(download_dir) | |
except: | |
pass | |
os.mkdir(download_dir) | |
posts = grab_blog_posts() | |
for slug, markdown in posts.items(): | |
file_name = slug + '.md' | |
with open(os.path.join(download_dir, file_name), 'w') as f: | |
f.write(markdown) | |
print('-> imported "' + file_name + '"') | |
print('done: imported ' + str(len(posts)) + ' pages.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment