Skip to content

Instantly share code, notes, and snippets.

@funrep

funrep/gmail.py

Last active Sep 9, 2020
Embed
What would you like to do?
from __future__ import print_function
import pickle
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from email.mime.text import MIMEText
import base64
# Script is altered version of the one provided in the official documentation.
# If modifying these scopes, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/gmail.send']
# Your credentials.json that you download from Google Cloud Platform
CREDENTIAL_FILE = 'credentials.json'
username = ''
with open('username.txt', 'r') as f:
username = f.read().strip()
FROM_EMAIL = username
TO_EMAIL = username
def init():
""" Authenticate with GMail API.
"""
# The file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists('token.pickle'):
print('Already authenticated')
return
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
CREDENTIAL_FILE, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
print('Authenticated')
return
print('Failed to authenticate')
def mail(subject, message_text):
"""Send mail using GMail API.
"""
creds = None
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
else:
print('Not authenticated, run init()')
service = build('gmail', 'v1', credentials=creds)
message = create_message(FROM_EMAIL, TO_EMAIL, subject, message_text)
print('Attempting to send message...')
try:
send_message(service, TO_EMAIL, message)
except Exception as e:
print('Failed due to: ', e)
print('Success!')
def create_message(sender, to, subject, message_text):
"""Create a message for an email.
Args:
sender: Email address of the sender.
to: Email address of the receiver.
subject: The subject of the email message.
message_text: The text of the email message.
Returns:
An object containing a base64url encoded email object.
"""
message = MIMEText(message_text)
message['to'] = to
message['from'] = sender
message['subject'] = subject
# https://github.com/googleapis/google-api-python-client/issues/93
raw = base64.urlsafe_b64encode(message.as_bytes())
raw = raw.decode()
return {'raw': raw}
def send_message(service, user_id, message):
"""Send an email message.
Args:
service: Authorized Gmail API service instance.
user_id: User's email address. The special value "me"
can be used to indicate the authenticated user.
message: Message to be sent.
Returns:
Sent Message.
"""
try:
message = (service.users().messages().send(userId=user_id, body=message)
.execute())
print('Message Id: %s' % message['id'])
return message
except Exception as e:
print('An error occurred: %s' % e)
# python -m pip install requests
import requests
import json
import gmail
from datetime import datetime
CACHE_FILE_NAME = 'afb_cache'
def todays_filename():
today = datetime.today().strftime('%Y-%m-%d')
fn = CACHE_FILE_NAME + today + '.txt'
return fn
def fetch():
print('Fetching from afb...')
r = requests.get('https://www.afbostader.se/redimo/rest/vacantproducts')
fn = todays_filename()
open(fn, 'w').write(r.text)
print('Fecthed and saved: ' + fn)
def load():
fn = todays_filename()
try:
j = json.loads(open(fn, 'r').read())
return j
except:
fetch()
j = json.loads(open(fn, 'r').read())
return j
def dont_care(_):
return True
rules = {
'area': lambda x: x not in ['Delphi', 'Sparta', 'Parantesen'],
'storeSize': dont_care,
'sqrMtrs': lambda x: float(x) > 20.0,
'rent': lambda x: int(x) < 5500,
'numberOfReservations': lambda x: int(x) < 50,
}
def pick(apartment):
for key, rule in rules.items():
if not rule(apartment[key]):
return False
return True
# return all(rule(apartment[key]) for key, rule in rules.items())
def show(apartment):
info = [
'address',
'lghnummer',
'zipcode',
'type',
'area',
'storeSize',
'sqrMtrs',
'rent',
'numberOfReservations',
]
s = ''
for key in info:
s += key + ' ' + str(apartment[key]) + '\n'
s += ('https://www.afbostader.se/lediga-bostader/bostadsdetalj/?obj='
+ apartment['productId'] + '\n')
s += '-------\n'
return s
def process():
j = load()
for p in j['product']:
if pick(p):
print(show(p))
def main():
j = load()
message_text = ''
for p in j['product']:
if pick(p):
message_text += show(p)
today = datetime.today().strftime('%Y-%m-%d')
subject = 'AFB report ' + today
gmail.mail(subject, message_text)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment