Skip to content

Instantly share code, notes, and snippets.

@avachen

avachen/update_comics_rss.py Secret

Last active Feb 2, 2021
Embed
What would you like to do?
update_comics_rss
#!/usr/bin/env python3
"""
Adapted from:
https://blog.macuyiko.com/post/2016/how-to-send-html-mails-with-oauth2-and-gmail-in-python.html
https://github.com/google/gmail-oauth2-tools/blob/master/python/oauth2.py
https://developers.google.com/identity/protocols/OAuth2
1. Connects to Miniflux Client
2. Processes unread feeds and formats email
3. Sends gmail email via OAuth2
"""
import base64
import imaplib
import json
import smtplib
import urllib.parse
import urllib.request
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import lxml.html
from datetime import datetime
from pytz import timezone
import miniflux
from bs4 import BeautifulSoup
import re
"""
GMAIL CLIENT
"""
GOOGLE_ACCOUNTS_BASE_URL = 'https://accounts.google.com'
REDIRECT_URI = 'urn:ietf:wg:oauth:2.0:oob'
GOOGLE_CLIENT_ID = 'XXXXXXXXXXXXX'
GOOGLE_CLIENT_SECRET = 'XXXXXXXXXXXXXXX'
GOOGLE_REFRESH_TOKEN = 'XXXXXXXXXXXXXXX'
GMAIL_SENDER = 'XXXXXXX@gmail.com'
GMAIL_RECEIVER = 'XXXXXXXX@gmail.com'
"""
MINIFLUX CLIENT
"""
MINIFLUX_URL = 'http://localhost/'
MINIFLUX_CLIENT_API = 'XXXXXXXXXXXX'
COMICS_CATEGORY_ID = 2
def command_to_url(command):
return '%s/%s' % (GOOGLE_ACCOUNTS_BASE_URL, command)
def url_escape(text):
return urllib.parse.quote(text, safe='~-._')
def url_unescape(text):
return urllib.parse.unquote(text)
def url_format_params(params):
param_fragments = []
for param in sorted(params.items(), key=lambda x: x[0]):
param_fragments.append('%s=%s' % (param[0], url_escape(param[1])))
return '&'.join(param_fragments)
def generate_permission_url(client_id, scope='https://mail.google.com/'):
params = {}
params['client_id'] = client_id
params['redirect_uri'] = REDIRECT_URI
params['scope'] = scope
params['response_type'] = 'code'
return '%s?%s' % (command_to_url('o/oauth2/auth'), url_format_params(params))
def call_authorize_tokens(client_id, client_secret, authorization_code):
params = {}
params['client_id'] = client_id
params['client_secret'] = client_secret
params['code'] = authorization_code
params['redirect_uri'] = REDIRECT_URI
params['grant_type'] = 'authorization_code'
request_url = command_to_url('o/oauth2/token')
response = urllib.request.urlopen(request_url, urllib.parse.urlencode(params).encode('UTF-8')).read().decode('UTF-8')
return json.loads(response)
def call_refresh_token(client_id, client_secret, refresh_token):
params = {}
params['client_id'] = client_id
params['client_secret'] = client_secret
params['refresh_token'] = refresh_token
params['grant_type'] = 'refresh_token'
request_url = command_to_url('o/oauth2/token')
response = urllib.request.urlopen(request_url, urllib.parse.urlencode(params).encode('UTF-8')).read().decode('UTF-8')
return json.loads(response)
def generate_oauth2_string(username, access_token, as_base64=False):
auth_string = 'user=%s\1auth=Bearer %s\1\1' % (username, access_token)
if as_base64:
auth_string = base64.b64encode(auth_string.encode('ascii')).decode('ascii')
return auth_string
def test_imap(user, auth_string):
imap_conn = imaplib.IMAP4_SSL('imap.gmail.com')
imap_conn.debug = 4
imap_conn.authenticate('XOAUTH2', lambda x: auth_string)
imap_conn.select('INBOX')
def test_smpt(user, base64_auth_string):
smtp_conn = smtplib.SMTP('smtp.gmail.com', 587)
smtp_conn.set_debuglevel(True)
smtp_conn.ehlo('test')
smtp_conn.starttls()
smtp_conn.docmd('AUTH', 'XOAUTH2 ' + base64_auth_string)
def get_authorization(google_client_id, google_client_secret):
scope = "https://mail.google.com/"
print('Navigate to the following URL to auth:', generate_permission_url(google_client_id, scope))
authorization_code = input('Enter verification code: ')
response = call_authorize_tokens(google_client_id, google_client_secret, authorization_code)
return response['refresh_token'], response['access_token'], response['expires_in']
def refresh_authorization(google_client_id, google_client_secret, refresh_token):
new_token = 0
response = call_refresh_token(google_client_id, google_client_secret, refresh_token)
if 'refresh_token' in response:
new_token = 1
return new_token, response
def save_token_string(refresh_token):
content = []
with open(__file__,"r") as f:
for line in f:
content.append(line)
with open(__file__,"w") as f:
content[36] = "GOOGLE_REFRESH_TOKEN = {n}\n".format(n=refresh_token) #modifies line 37
for i in range(len(content)):
f.write(content[i])
def send_mail(fromaddr, toaddr, subject, message):
new_token, oauth_response = refresh_authorization(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET, GOOGLE_REFRESH_TOKEN)
if new_token:
refresh_token = oauth_response['refresh_token']
else:
refresh_token = []
access_token = oauth_response['access_token']
expires_in = oauth_response['expires_in']
auth_string = generate_oauth2_string(fromaddr, access_token, as_base64=True)
msg = MIMEMultipart('related')
msg['Subject'] = subject
msg['From'] = fromaddr
msg['To'] = toaddr
msg.preamble = 'This is a multi-part message in MIME format.'
msg_alternative = MIMEMultipart('alternative')
msg.attach(msg_alternative)
part_text = MIMEText(lxml.html.fromstring(message).text_content().encode('utf-8'), 'plain', _charset='utf-8')
part_html = MIMEText(message.encode('utf-8'), 'html', _charset='utf-8')
msg_alternative.attach(part_text)
msg_alternative.attach(part_html)
server = smtplib.SMTP('smtp.gmail.com:587')
server.ehlo(GOOGLE_CLIENT_ID)
server.starttls()
server.docmd('AUTH', 'XOAUTH2 ' + auth_string)
server.sendmail(fromaddr, toaddr, msg.as_string())
server.quit()
return new_token, refresh_token
def process_unreads(miniflux_client, entries, category_id):
tz = timezone('US/Eastern')
now = datetime.now(tz)
dt_string = now.strftime("%B %d, %Y %I:%M %p")
message = f'<b> {dt_string} </b><br><ul>'
for entry in entries['entries']:
series_title = entry['feed']['title']
series_id = entry['feed_id']
panel_title = entry['title']
panel_url = entry['url']
panel_content = entry['content']
if series_id == 19: #unsounded default url links to tumblr not comic
soup = BeautifulSoup(panel_content, 'html.parser')
link = soup.find('a',href=True)
panel_url = link['href']
elif series_id == 10: #kingdom updates to placeholder chapters, so increment one chapter back
ch_num = re.search(r'\d+', panel_url).group()
new_ch_num = str(int(ch_num)-1)
panel_url = panel_url.replace(ch_num, new_ch_num)
panel_title = panel_title.replace(ch_num, new_ch_num)
message = message + f'<li><a href={panel_url}>{series_title} - {panel_title}</a>'
message = message + '</ul>'
client.mark_category_entries_as_read(category_id)
return message
if __name__ == '__main__':
client = miniflux.Client(MINIFLUX_URL, api_key=MINIFLUX_CLIENT_API)
entries = client.get_entries(status='unread', limit=100, direction='desc')
if 'total' in entries:
unread_count = entries['total']
if unread_count == 0:
print('process successful, no new comics')
exit()
elif unread_count == 1:
subjectmessage = f'This morning\'s {unread_count} comic update'
else:
subjectmessage = f'This morning\'s {unread_count} comic updates'
bodyhtml = process_unreads(client, entries, COMICS_CATEGORY_ID)
new_token, refresh_token = send_mail(GMAIL_SENDER, GMAIL_RECEIVER,
subjectmessage,
bodyhtml)
if new_token:
save_token_string(refresh_token)
print('email sent successfully')
exit()
else:
print('weird error occured')
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment