Skip to content

Instantly share code, notes, and snippets.

@alyoshenka
Last active January 19, 2021 05:03
Show Gist options
  • Save alyoshenka/270c231cdcd14c794d8e7a76f99e3f0f to your computer and use it in GitHub Desktop.
Save alyoshenka/270c231cdcd14c794d8e7a76f99e3f0f to your computer and use it in GitHub Desktop.
Query Google Sheets API to update time tracker via Twilio text messaging
credentials/
.vscode/
token.pickle
log.txt
__pycache__/

Sheets Texter

connects with a Google Sheets time tracker spreadsheet and sends texts every hour to update remotely

import platform
def isWindows():
return platform.system() == 'Windows'
def getFile(fileName):
if not isWindows():
fileName = '/home/jay/SheetsTexter/' + fileName
return fileName
def getFileContents(fileName):
return open(getFile(fileName)).read()
import credentialFiles
from datetime import datetime
def log(msg):
"""log statement to file"""
f_name = credentialFiles.getFile('log.txt')
t = datetime.now()
msg = str(t) + '\n' + str(msg) + '\n\n'
fout = open(f_name, 'a')
fout.write(msg)
fout.close()
#!/bin/bash
sleep 20
/home/jay/SheetsTexter/main.py &
#!/usr/bin/env python3
# Other
import threading
import schedule
import time
import keyboard
###
# Mine
import credentialFiles
import debugging
import sms
import sheetsAccessor
###
sleep_time = 1
def send_schedule_query(sheetsRunner):
"""Queries for the category of the past hour"""
# check if current cell is empty
if sheetsRunner.is_current_cell_empty():
# send message
time.sleep(sleep_time)
msg = sheetsRunner.get_query_message()
sms.send_message(msg)
else:
debugging.log('current cell not empty, not sending schedule query')
# listen for message
def schedule_thread(run_scheduler):
"""Runs the scheduler, checking for scheduled tasks"""
while True:
try:
if not run_scheduler():
debugging.log('! run_scheduler')
break
schedule.run_pending()
except Exception as e:
debugging.log(e)
time.sleep(sleep_time)
def stop():
"""Program stop key (esc)"""
try:
return keyboard.is_pressed('esc')
except Exception:
return False
def schedule_times(morn, night):
times = []
for t in range(morn, night):
s = ''
if t < 10:
s = '0'
times.append(s + str(t) + ':00')
return times
def main():
sheetsRunner = sheetsAccessor.SheetsAccessor()
sms.send_message('opening schedule tracker')
for t in schedule_times(6, 10+12): # 6am - 10pm
schedule.every().day.at(t).do(send_schedule_query, sheetsRunner=sheetsRunner)
#schedule.every(2).minutes.do(send_schedule_query, sheetsRunner=sheetsRunner)
run_scheduler = True
scheduler = threading.Thread(
target=schedule_thread, args=(lambda: run_scheduler,))
scheduler.start()
send_schedule_query(sheetsRunner)
latest_message_sid = sms.get_latest_message_sid()
time.sleep(sleep_time)
while not stop():
check_message_sid = sms.get_latest_message_sid()
try:
if not (check_message_sid == latest_message_sid):
time.sleep(sleep_time)
debugging.log('new message: ' + sms.get_latest_message_body())
if sms.querying_schedule():
debugging.log('querying schedule')
time.sleep(sleep_time)
resp = sms.get_latest_message_body()
debugging.log('response: %s' % (resp))
if sheetsRunner.validate_as_category(resp):
sheetsRunner.update_current_cell(resp)
debugging.log('updated current cell')
sms.send_message('schedule updated successfully')
else:
debugging.log('bad data: ' + resp)
sms.send_message('bad data: ' + resp)
latest_message_sid = check_message_sid
except Exception as e:
debugging.log(e)
time.sleep(sleep_time)
debugging.log('joining thread')
run_scheduler = False
scheduler.join()
debugging.log('over')
# sms.send_sms_listener()
# query_schedule()
if __name__ == '__main__':
try:
main()
except Exception as e:
debugging.log(e)
#pylin:: disable=maybe-no-member
# Google API
from __future__ import print_function
import pickle
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
###
# Mine
import credentialFiles
import debugging
###
# Other
from datetime import datetime
import string
###
# Variables
# If modifying these scopes, delete the file token.pickle.
scopes = ['https://www.googleapis.com/auth/spreadsheets']
value_input_option = 'RAW'
# The ID and range of a sample spreadsheet.
spreadsheet_id = '1ynZfMRutbaqku7Kc45BKLBYS1BpUb8tBHiVZsTcWx_c'
sheet_name = '2021!'
category_range = 'C2:R3'
data_range = 'A7:Z371' # update later
row_offset = 6 # how far dowm from the top time data starts
###
def get_sheet_range(r):
return sheet_name + r
class SheetsAccessor:
"""accesses a Google Sheets spreadsheet"""
def __init__(self):
self.service = self.init_google_api_service()
self.sheet = self.get_spreadsheet()
self.categories = self.get_categories()
self.time_data = self.get_time_data()
def init_google_api_service(self):
"""initialize Google credentials"""
creds = None
# The file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists(credentialFiles.getFile('token.pickle')):
with open(credentialFiles.getFile('token.pickle'), 'rb') as token:
creds = pickle.load(token)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
credentialFiles.getFile('credentials/google_credentials.json'), scopes)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
service = build('sheets', 'v4', credentials=creds)
return service
def get_spreadsheet(self):
# Call the Sheets API
sheet = self.service.spreadsheets()
return sheet
def get_data(self, r):
"""get values in given range"""
sheet_range = get_sheet_range(r)
result = self.service.spreadsheets().values().get(spreadsheetId=spreadsheet_id,
range=sheet_range).execute()
values = result.get('values', [])
return values
def get_categories(self):
"""get time categories"""
values = self.get_data(category_range)
categories = {}
for i in range(len(values[0])):
val = i + 1
exp = values[1][i]
categories[val] = exp
return categories
def get_time_data(self):
"""returns hourly time data portion of sheet"""
return self.get_data(data_range)
def update_cell(self, cell, value):
"""set the value of the given cell to the given value"""
cell = get_sheet_range(cell)
debugging.log('setting cell %s = %s' % (cell, value))
values = [[value]]
body = {'values': values}
result = self.service.spreadsheets().values().update(
spreadsheetId=spreadsheet_id, range=cell,
valueInputOption=value_input_option, body=body).execute()
debugging.log('{0} cells updated.'.format(result.get('updatedCells')))
def get_query_message(self):
"""Message to query category"""
now = datetime.now()
msg = 'What have you done for the last hour?\n'
msg = msg + str(now)
for key in self.categories:
msg = msg + '\n{0}: {1}'.format(key, self.categories[key])
return msg
def update_current_cell(self, value):
"""get the current cell and update it with the given value"""
c_cell = self.get_current_cell()
self.update_cell(c_cell, value)
def is_current_cell_empty(self):
"""Is the current cell empty?"""
r = self.get_current_cell()
('getting data from: ' + r)
val = self.get_data(r)
debugging.log(r + ": " + str(val))
return len(val) == 0
def validate_as_category(self, content):
"""returns whether content is a valid time category"""
for i in self.categories:
if str(i) == content:
return True
return False
def get_current_cell(self):
"""get next cell to update, given current time"""
now = datetime.now()
month = now.month
day = now.day
hour = now.hour
date_string = str(month) + '/'
if (month < 10):
date_string = '0' + date_string
if(day < 10):
date_string = date_string + '0'
date_string = date_string + str(day)
# find matching row
row_idx = -1
for c in range(len(self.time_data)):
r = self.time_data[c]
if r[0] == date_string:
row_idx = c + 1 + row_offset
break
# find matching column
col_idx = hour + 1
# convert to alphabet
col_char = string.ascii_uppercase[col_idx]
current_cell = col_char + str(row_idx)
return current_cell
"""Twilio sms"""
# Twilio API
from twilio.rest import Client
from flask import Flask, request, redirect
from twilio.twiml.messaging_response import MessagingResponse
from twilio import twiml
# app = Flask(__name__)
###
# Other
import threading
###
# Mine
import credentialFiles
###
def get_client():
"""inits and returns a twilio messaging client"""
sid = credentialFiles.getFileContents('credentials/twilio_sid.txt')
token = credentialFiles.getFileContents('credentials/twilio_token.txt')
client = Client(sid, token)
return client
def get_my_number():
return credentialFiles.getFileContents('credentials/my_number.txt')
def get_sender_number():
return credentialFiles.getFileContents('credentials/twilio_number.txt')
def send_message(msg, client=get_client()):
"""sends the given message to the given client"""
try:
client.messages.create(
to=get_my_number(), from_=get_sender_number(), body=msg)
except Exception as e:
print(e)
# @app.route('/sms', methods=['POST'])
def sms():
"""intercepts incoming sms messages"""
number = request.form['From']
message_body = request.form['Body']
respond(number, message_body)
return 'None'
def respond(number, message_body):
"""deals with incoming message"""
if not number == credentialFiles.getFileContents('credentials/my_number.txt'):
print('ERROR: UNRECOGNIZED NUMBER: %s' % (number))
return
if(querying_schedule()):
update_current_cell(message_body)
send_message('updated current cell to %s' % (message_body))
else:
send_message(number + ': ' + message_body)
def get_messages(limit=10, client=get_client()):
return client.messages.list(limit=limit)
def querying_schedule():
"""Check if last message was schedule query"""
last_message = get_messages(2)[1].body
cmp_str = 'What have you done for the last hour?'
# print('-2 message: %s' % (last_message))
return last_message.startswith(cmp_str)
def new_message(msg_id):
"""returns whether the newest message id is different from the
given id (whether there is a new message)"""
return not (get_messages(1)[0].id == msg_id)
def get_latest_message():
return get_messages(1)[0]
def get_latest_message_sid():
return get_messages(1)[0].sid
def get_latest_message_body():
return get_messages(1)[0].body
  • error handling
  • weather report?
  • interactivity?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment