Last active
January 19, 2021 05:03
-
-
Save alyoshenka/270c231cdcd14c794d8e7a76f99e3f0f to your computer and use it in GitHub Desktop.
Query Google Sheets API to update time tracker via Twilio text messaging
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
credentials/ | |
.vscode/ | |
token.pickle | |
log.txt | |
__pycache__/ | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import platform | |
def isWindows(): | |
return platform.system() == 'Windows' | |
def getFile(fileName): | |
if not isWindows(): | |
fileName = '/home/jay/SheetsTexter/' + fileName | |
return fileName | |
def getFileContents(fileName): | |
return open(getFile(fileName)).read() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import credentialFiles | |
from datetime import datetime | |
def log(msg): | |
"""log statement to file""" | |
f_name = credentialFiles.getFile('log.txt') | |
t = datetime.now() | |
msg = str(t) + '\n' + str(msg) + '\n\n' | |
fout = open(f_name, 'a') | |
fout.write(msg) | |
fout.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
sleep 20 | |
/home/jay/SheetsTexter/main.py & |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Other | |
import threading | |
import schedule | |
import time | |
import keyboard | |
### | |
# Mine | |
import credentialFiles | |
import debugging | |
import sms | |
import sheetsAccessor | |
### | |
sleep_time = 1 | |
def send_schedule_query(sheetsRunner): | |
"""Queries for the category of the past hour""" | |
# check if current cell is empty | |
if sheetsRunner.is_current_cell_empty(): | |
# send message | |
time.sleep(sleep_time) | |
msg = sheetsRunner.get_query_message() | |
sms.send_message(msg) | |
else: | |
debugging.log('current cell not empty, not sending schedule query') | |
# listen for message | |
def schedule_thread(run_scheduler): | |
"""Runs the scheduler, checking for scheduled tasks""" | |
while True: | |
try: | |
if not run_scheduler(): | |
debugging.log('! run_scheduler') | |
break | |
schedule.run_pending() | |
except Exception as e: | |
debugging.log(e) | |
time.sleep(sleep_time) | |
def stop(): | |
"""Program stop key (esc)""" | |
try: | |
return keyboard.is_pressed('esc') | |
except Exception: | |
return False | |
def schedule_times(morn, night): | |
times = [] | |
for t in range(morn, night): | |
s = '' | |
if t < 10: | |
s = '0' | |
times.append(s + str(t) + ':00') | |
return times | |
def main(): | |
sheetsRunner = sheetsAccessor.SheetsAccessor() | |
sms.send_message('opening schedule tracker') | |
for t in schedule_times(6, 10+12): # 6am - 10pm | |
schedule.every().day.at(t).do(send_schedule_query, sheetsRunner=sheetsRunner) | |
#schedule.every(2).minutes.do(send_schedule_query, sheetsRunner=sheetsRunner) | |
run_scheduler = True | |
scheduler = threading.Thread( | |
target=schedule_thread, args=(lambda: run_scheduler,)) | |
scheduler.start() | |
send_schedule_query(sheetsRunner) | |
latest_message_sid = sms.get_latest_message_sid() | |
time.sleep(sleep_time) | |
while not stop(): | |
check_message_sid = sms.get_latest_message_sid() | |
try: | |
if not (check_message_sid == latest_message_sid): | |
time.sleep(sleep_time) | |
debugging.log('new message: ' + sms.get_latest_message_body()) | |
if sms.querying_schedule(): | |
debugging.log('querying schedule') | |
time.sleep(sleep_time) | |
resp = sms.get_latest_message_body() | |
debugging.log('response: %s' % (resp)) | |
if sheetsRunner.validate_as_category(resp): | |
sheetsRunner.update_current_cell(resp) | |
debugging.log('updated current cell') | |
sms.send_message('schedule updated successfully') | |
else: | |
debugging.log('bad data: ' + resp) | |
sms.send_message('bad data: ' + resp) | |
latest_message_sid = check_message_sid | |
except Exception as e: | |
debugging.log(e) | |
time.sleep(sleep_time) | |
debugging.log('joining thread') | |
run_scheduler = False | |
scheduler.join() | |
debugging.log('over') | |
# sms.send_sms_listener() | |
# query_schedule() | |
if __name__ == '__main__': | |
try: | |
main() | |
except Exception as e: | |
debugging.log(e) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pylin:: disable=maybe-no-member | |
# Google API | |
from __future__ import print_function | |
import pickle | |
import os.path | |
from googleapiclient.discovery import build | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
### | |
# Mine | |
import credentialFiles | |
import debugging | |
### | |
# Other | |
from datetime import datetime | |
import string | |
### | |
# Variables | |
# If modifying these scopes, delete the file token.pickle. | |
scopes = ['https://www.googleapis.com/auth/spreadsheets'] | |
value_input_option = 'RAW' | |
# The ID and range of a sample spreadsheet. | |
spreadsheet_id = '1ynZfMRutbaqku7Kc45BKLBYS1BpUb8tBHiVZsTcWx_c' | |
sheet_name = '2021!' | |
category_range = 'C2:R3' | |
data_range = 'A7:Z371' # update later | |
row_offset = 6 # how far dowm from the top time data starts | |
### | |
def get_sheet_range(r): | |
return sheet_name + r | |
class SheetsAccessor: | |
"""accesses a Google Sheets spreadsheet""" | |
def __init__(self): | |
self.service = self.init_google_api_service() | |
self.sheet = self.get_spreadsheet() | |
self.categories = self.get_categories() | |
self.time_data = self.get_time_data() | |
def init_google_api_service(self): | |
"""initialize Google credentials""" | |
creds = None | |
# The file token.pickle stores the user's access and refresh tokens, and is | |
# created automatically when the authorization flow completes for the first | |
# time. | |
if os.path.exists(credentialFiles.getFile('token.pickle')): | |
with open(credentialFiles.getFile('token.pickle'), 'rb') as token: | |
creds = pickle.load(token) | |
# If there are no (valid) credentials available, let the user log in. | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file( | |
credentialFiles.getFile('credentials/google_credentials.json'), scopes) | |
creds = flow.run_local_server(port=0) | |
# Save the credentials for the next run | |
with open('token.pickle', 'wb') as token: | |
pickle.dump(creds, token) | |
service = build('sheets', 'v4', credentials=creds) | |
return service | |
def get_spreadsheet(self): | |
# Call the Sheets API | |
sheet = self.service.spreadsheets() | |
return sheet | |
def get_data(self, r): | |
"""get values in given range""" | |
sheet_range = get_sheet_range(r) | |
result = self.service.spreadsheets().values().get(spreadsheetId=spreadsheet_id, | |
range=sheet_range).execute() | |
values = result.get('values', []) | |
return values | |
def get_categories(self): | |
"""get time categories""" | |
values = self.get_data(category_range) | |
categories = {} | |
for i in range(len(values[0])): | |
val = i + 1 | |
exp = values[1][i] | |
categories[val] = exp | |
return categories | |
def get_time_data(self): | |
"""returns hourly time data portion of sheet""" | |
return self.get_data(data_range) | |
def update_cell(self, cell, value): | |
"""set the value of the given cell to the given value""" | |
cell = get_sheet_range(cell) | |
debugging.log('setting cell %s = %s' % (cell, value)) | |
values = [[value]] | |
body = {'values': values} | |
result = self.service.spreadsheets().values().update( | |
spreadsheetId=spreadsheet_id, range=cell, | |
valueInputOption=value_input_option, body=body).execute() | |
debugging.log('{0} cells updated.'.format(result.get('updatedCells'))) | |
def get_query_message(self): | |
"""Message to query category""" | |
now = datetime.now() | |
msg = 'What have you done for the last hour?\n' | |
msg = msg + str(now) | |
for key in self.categories: | |
msg = msg + '\n{0}: {1}'.format(key, self.categories[key]) | |
return msg | |
def update_current_cell(self, value): | |
"""get the current cell and update it with the given value""" | |
c_cell = self.get_current_cell() | |
self.update_cell(c_cell, value) | |
def is_current_cell_empty(self): | |
"""Is the current cell empty?""" | |
r = self.get_current_cell() | |
('getting data from: ' + r) | |
val = self.get_data(r) | |
debugging.log(r + ": " + str(val)) | |
return len(val) == 0 | |
def validate_as_category(self, content): | |
"""returns whether content is a valid time category""" | |
for i in self.categories: | |
if str(i) == content: | |
return True | |
return False | |
def get_current_cell(self): | |
"""get next cell to update, given current time""" | |
now = datetime.now() | |
month = now.month | |
day = now.day | |
hour = now.hour | |
date_string = str(month) + '/' | |
if (month < 10): | |
date_string = '0' + date_string | |
if(day < 10): | |
date_string = date_string + '0' | |
date_string = date_string + str(day) | |
# find matching row | |
row_idx = -1 | |
for c in range(len(self.time_data)): | |
r = self.time_data[c] | |
if r[0] == date_string: | |
row_idx = c + 1 + row_offset | |
break | |
# find matching column | |
col_idx = hour + 1 | |
# convert to alphabet | |
col_char = string.ascii_uppercase[col_idx] | |
current_cell = col_char + str(row_idx) | |
return current_cell |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Twilio sms""" | |
# Twilio API | |
from twilio.rest import Client | |
from flask import Flask, request, redirect | |
from twilio.twiml.messaging_response import MessagingResponse | |
from twilio import twiml | |
# app = Flask(__name__) | |
### | |
# Other | |
import threading | |
### | |
# Mine | |
import credentialFiles | |
### | |
def get_client(): | |
"""inits and returns a twilio messaging client""" | |
sid = credentialFiles.getFileContents('credentials/twilio_sid.txt') | |
token = credentialFiles.getFileContents('credentials/twilio_token.txt') | |
client = Client(sid, token) | |
return client | |
def get_my_number(): | |
return credentialFiles.getFileContents('credentials/my_number.txt') | |
def get_sender_number(): | |
return credentialFiles.getFileContents('credentials/twilio_number.txt') | |
def send_message(msg, client=get_client()): | |
"""sends the given message to the given client""" | |
try: | |
client.messages.create( | |
to=get_my_number(), from_=get_sender_number(), body=msg) | |
except Exception as e: | |
print(e) | |
# @app.route('/sms', methods=['POST']) | |
def sms(): | |
"""intercepts incoming sms messages""" | |
number = request.form['From'] | |
message_body = request.form['Body'] | |
respond(number, message_body) | |
return 'None' | |
def respond(number, message_body): | |
"""deals with incoming message""" | |
if not number == credentialFiles.getFileContents('credentials/my_number.txt'): | |
print('ERROR: UNRECOGNIZED NUMBER: %s' % (number)) | |
return | |
if(querying_schedule()): | |
update_current_cell(message_body) | |
send_message('updated current cell to %s' % (message_body)) | |
else: | |
send_message(number + ': ' + message_body) | |
def get_messages(limit=10, client=get_client()): | |
return client.messages.list(limit=limit) | |
def querying_schedule(): | |
"""Check if last message was schedule query""" | |
last_message = get_messages(2)[1].body | |
cmp_str = 'What have you done for the last hour?' | |
# print('-2 message: %s' % (last_message)) | |
return last_message.startswith(cmp_str) | |
def new_message(msg_id): | |
"""returns whether the newest message id is different from the | |
given id (whether there is a new message)""" | |
return not (get_messages(1)[0].id == msg_id) | |
def get_latest_message(): | |
return get_messages(1)[0] | |
def get_latest_message_sid(): | |
return get_messages(1)[0].sid | |
def get_latest_message_body(): | |
return get_messages(1)[0].body |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment