-
-
Save avachen/391db99690cbd94780b24e1f62ce2da8 to your computer and use it in GitHub Desktop.
update_comics_rss
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Adapted from: | |
https://blog.macuyiko.com/post/2016/how-to-send-html-mails-with-oauth2-and-gmail-in-python.html | |
https://github.com/google/gmail-oauth2-tools/blob/master/python/oauth2.py | |
https://developers.google.com/identity/protocols/OAuth2 | |
1. Connects to Miniflux Client | |
2. Processes unread feeds and formats email | |
3. Sends gmail email via OAuth2 | |
""" | |
import base64 | |
import imaplib | |
import json | |
import smtplib | |
import urllib.parse | |
import urllib.request | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
import lxml.html | |
from datetime import datetime | |
from pytz import timezone | |
import miniflux | |
from bs4 import BeautifulSoup | |
import re | |
""" | |
GMAIL CLIENT | |
""" | |
GOOGLE_ACCOUNTS_BASE_URL = 'https://accounts.google.com' | |
REDIRECT_URI = 'urn:ietf:wg:oauth:2.0:oob' | |
GOOGLE_CLIENT_ID = 'XXXXXXXXXXXXX' | |
GOOGLE_CLIENT_SECRET = 'XXXXXXXXXXXXXXX' | |
GOOGLE_REFRESH_TOKEN = 'XXXXXXXXXXXXXXX' | |
GMAIL_SENDER = 'XXXXXXX@gmail.com' | |
GMAIL_RECEIVER = 'XXXXXXXX@gmail.com' | |
""" | |
MINIFLUX CLIENT | |
""" | |
MINIFLUX_URL = 'http://localhost/' | |
MINIFLUX_CLIENT_API = 'XXXXXXXXXXXX' | |
COMICS_CATEGORY_ID = 2 | |
def command_to_url(command): | |
return '%s/%s' % (GOOGLE_ACCOUNTS_BASE_URL, command) | |
def url_escape(text): | |
return urllib.parse.quote(text, safe='~-._') | |
def url_unescape(text): | |
return urllib.parse.unquote(text) | |
def url_format_params(params): | |
param_fragments = [] | |
for param in sorted(params.items(), key=lambda x: x[0]): | |
param_fragments.append('%s=%s' % (param[0], url_escape(param[1]))) | |
return '&'.join(param_fragments) | |
def generate_permission_url(client_id, scope='https://mail.google.com/'): | |
params = {} | |
params['client_id'] = client_id | |
params['redirect_uri'] = REDIRECT_URI | |
params['scope'] = scope | |
params['response_type'] = 'code' | |
return '%s?%s' % (command_to_url('o/oauth2/auth'), url_format_params(params)) | |
def call_authorize_tokens(client_id, client_secret, authorization_code): | |
params = {} | |
params['client_id'] = client_id | |
params['client_secret'] = client_secret | |
params['code'] = authorization_code | |
params['redirect_uri'] = REDIRECT_URI | |
params['grant_type'] = 'authorization_code' | |
request_url = command_to_url('o/oauth2/token') | |
response = urllib.request.urlopen(request_url, urllib.parse.urlencode(params).encode('UTF-8')).read().decode('UTF-8') | |
return json.loads(response) | |
def call_refresh_token(client_id, client_secret, refresh_token): | |
params = {} | |
params['client_id'] = client_id | |
params['client_secret'] = client_secret | |
params['refresh_token'] = refresh_token | |
params['grant_type'] = 'refresh_token' | |
request_url = command_to_url('o/oauth2/token') | |
response = urllib.request.urlopen(request_url, urllib.parse.urlencode(params).encode('UTF-8')).read().decode('UTF-8') | |
return json.loads(response) | |
def generate_oauth2_string(username, access_token, as_base64=False): | |
auth_string = 'user=%s\1auth=Bearer %s\1\1' % (username, access_token) | |
if as_base64: | |
auth_string = base64.b64encode(auth_string.encode('ascii')).decode('ascii') | |
return auth_string | |
def test_imap(user, auth_string): | |
imap_conn = imaplib.IMAP4_SSL('imap.gmail.com') | |
imap_conn.debug = 4 | |
imap_conn.authenticate('XOAUTH2', lambda x: auth_string) | |
imap_conn.select('INBOX') | |
def test_smpt(user, base64_auth_string): | |
smtp_conn = smtplib.SMTP('smtp.gmail.com', 587) | |
smtp_conn.set_debuglevel(True) | |
smtp_conn.ehlo('test') | |
smtp_conn.starttls() | |
smtp_conn.docmd('AUTH', 'XOAUTH2 ' + base64_auth_string) | |
def get_authorization(google_client_id, google_client_secret): | |
scope = "https://mail.google.com/" | |
print('Navigate to the following URL to auth:', generate_permission_url(google_client_id, scope)) | |
authorization_code = input('Enter verification code: ') | |
response = call_authorize_tokens(google_client_id, google_client_secret, authorization_code) | |
return response['refresh_token'], response['access_token'], response['expires_in'] | |
def refresh_authorization(google_client_id, google_client_secret, refresh_token): | |
new_token = 0 | |
response = call_refresh_token(google_client_id, google_client_secret, refresh_token) | |
if 'refresh_token' in response: | |
new_token = 1 | |
return new_token, response | |
def save_token_string(refresh_token): | |
content = [] | |
with open(__file__,"r") as f: | |
for line in f: | |
content.append(line) | |
with open(__file__,"w") as f: | |
content[36] = "GOOGLE_REFRESH_TOKEN = {n}\n".format(n=refresh_token) #modifies line 37 | |
for i in range(len(content)): | |
f.write(content[i]) | |
def send_mail(fromaddr, toaddr, subject, message): | |
new_token, oauth_response = refresh_authorization(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET, GOOGLE_REFRESH_TOKEN) | |
if new_token: | |
refresh_token = oauth_response['refresh_token'] | |
else: | |
refresh_token = [] | |
access_token = oauth_response['access_token'] | |
expires_in = oauth_response['expires_in'] | |
auth_string = generate_oauth2_string(fromaddr, access_token, as_base64=True) | |
msg = MIMEMultipart('related') | |
msg['Subject'] = subject | |
msg['From'] = fromaddr | |
msg['To'] = toaddr | |
msg.preamble = 'This is a multi-part message in MIME format.' | |
msg_alternative = MIMEMultipart('alternative') | |
msg.attach(msg_alternative) | |
part_text = MIMEText(lxml.html.fromstring(message).text_content().encode('utf-8'), 'plain', _charset='utf-8') | |
part_html = MIMEText(message.encode('utf-8'), 'html', _charset='utf-8') | |
msg_alternative.attach(part_text) | |
msg_alternative.attach(part_html) | |
server = smtplib.SMTP('smtp.gmail.com:587') | |
server.ehlo(GOOGLE_CLIENT_ID) | |
server.starttls() | |
server.docmd('AUTH', 'XOAUTH2 ' + auth_string) | |
server.sendmail(fromaddr, toaddr, msg.as_string()) | |
server.quit() | |
return new_token, refresh_token | |
def process_unreads(miniflux_client, entries, category_id): | |
tz = timezone('US/Eastern') | |
now = datetime.now(tz) | |
dt_string = now.strftime("%B %d, %Y %I:%M %p") | |
message = f'<b> {dt_string} </b><br><ul>' | |
for entry in entries['entries']: | |
series_title = entry['feed']['title'] | |
series_id = entry['feed_id'] | |
panel_title = entry['title'] | |
panel_url = entry['url'] | |
panel_content = entry['content'] | |
if series_id == 19: #unsounded default url links to tumblr not comic | |
soup = BeautifulSoup(panel_content, 'html.parser') | |
link = soup.find('a',href=True) | |
panel_url = link['href'] | |
elif series_id == 10: #kingdom updates to placeholder chapters, so increment one chapter back | |
ch_num = re.search(r'\d+', panel_url).group() | |
new_ch_num = str(int(ch_num)-1) | |
panel_url = panel_url.replace(ch_num, new_ch_num) | |
panel_title = panel_title.replace(ch_num, new_ch_num) | |
message = message + f'<li><a href={panel_url}>{series_title} - {panel_title}</a>' | |
message = message + '</ul>' | |
client.mark_category_entries_as_read(category_id) | |
return message | |
if __name__ == '__main__': | |
client = miniflux.Client(MINIFLUX_URL, api_key=MINIFLUX_CLIENT_API) | |
entries = client.get_entries(status='unread', limit=100, direction='desc') | |
if 'total' in entries: | |
unread_count = entries['total'] | |
if unread_count == 0: | |
print('process successful, no new comics') | |
exit() | |
elif unread_count == 1: | |
subjectmessage = f'This morning\'s {unread_count} comic update' | |
else: | |
subjectmessage = f'This morning\'s {unread_count} comic updates' | |
bodyhtml = process_unreads(client, entries, COMICS_CATEGORY_ID) | |
new_token, refresh_token = send_mail(GMAIL_SENDER, GMAIL_RECEIVER, | |
subjectmessage, | |
bodyhtml) | |
if new_token: | |
save_token_string(refresh_token) | |
print('email sent successfully') | |
exit() | |
else: | |
print('weird error occured') | |
exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment