Skip to content

Instantly share code, notes, and snippets.

@ddunlop
Created December 7, 2016 16:00
Show Gist options
  • Save ddunlop/e98cbe7a162f14f60c38badd8ea3ad34 to your computer and use it in GitHub Desktop.
Save ddunlop/e98cbe7a162f14f60c38badd8ea3ad34 to your computer and use it in GitHub Desktop.
from __future__ import unicode_literals
# don't convert to ascii in py2.7 when creating string to return
import json
from datetime import datetime, time, date, timedelta
from client import slack_client as sc
get_updates = False
earliest_request_time = time(hour=9, minute=30)
dump_time = time(hour=22, minute=30)
user_ids = {}
user_lookup = {}
todays_status = {}
standup_channel = {}
questions = [
{
'text': 'What did you do yesterday?',
'color': '#7CD197',
},
{
'text': 'What are you doing today?',
'color': '#7C97D1',
},
{
'text': 'Any blockers?',
'color': '#D17C97',
},
]
skip_question = ['nothing', 'no', 'skip']
crontable = []
outputs = []
crontable.append([10, "check_times"])
import sys
def check_times():
global next_request_time
global next_dump
global get_updates
global todays_status
now = datetime.now()
print('next_request_time: {}'.format(next_request_time))
try:
if not get_updates and next_request_time < now and now < next_dump:
print("going to load presence")
get_updates = True
# default = {'question': 3, 'answers': [{'text': 'test', 'question': 0}, {'text': 'cat', 'question': 1}, {'text': 'none', 'question': 2}]}
# todays_status = {user_id: default for user_id in user_ids}
todays_status = {user_id: {'question': 0, 'answers': []} for user_id in user_ids}
present_users = get_present_users()
for user_id in present_users:
request_update(user_id)
if now > next_dump:
get_updates = False
display_standup()
except:
e = sys.exc_info()
print("error: {}".format(e))
print('done check_times')
def get_present_users():
present_users = []
for user_id in user_ids:
presence = sc.api_call('users.getPresence', user=user_id)
if presence.get('presence') == 'active':
present_users.append(user_id)
return present_users
def get_next_time(t):
nextTime = datetime.combine(date.today(), t)
while nextTime <= datetime.now() or nextTime.isoweekday() >= 6:
nextTime += timedelta(days=1)
return nextTime
def display_standup():
print('display_standup')
channel_id = standup_channel['id']
print("id for display_standup: {}".format(channel_id))
attachments = []
for user_id, users_status in todays_status.items():
print("user_id: {} user_status: {}".format(user_id, users_status))
answers = users_status['answers']
attachment = {
'pretext': user_lookup[user_id]['real_name']
}
for answer in answers:
question = questions[answer['question']]
attachment['title'] = question['text']
attachment['color'] = question['color']
attachment['text'] = answer['text']
attachments.append(attachment)
attachment = {}
sc.api_call('chat.postMessage',
channel=channel_id,
username='wesley',
attachments=attachments)
def load_users(users):
global user_lookup
global user_ids
print("loading users")
user_response = sc.api_call('users.list')
user_ids = {user['id']: True for user in user_response['members'] if user['name'] in users}
user_lookup = {user['id']: user for user in user_response['members']}
print("done loading users: {}".format(user_lookup))
def load_channel(channel):
global standup_channel
print("loading channel info")
channels_response = sc.api_call('channels.list')
matching_channel = [ch for ch in channels_response['channels'] if ch['name'] == channel]
if len(matching_channel) == 1:
standup_channel = matching_channel[0]
else:
print('did not find just one:'.format(matching_channel))
print("done loading channel info: {}".format(standup_channel['id']))
def get_username(id):
user = user_lookup.get(id)
if not user:
return id
if user['real_name']:
return user['real_name']
return user['name']
def process_hello(data):
load_users(config.get('users'))
load_channel(config.get('channel'))
global next_request_time
global next_dump
next_dump = get_next_time(dump_time)
if datetime.now().time() < dump_time:
next_request_time = datetime.combine(date.today(), earliest_request_time)
else:
next_request_time = get_next_time(earliest_request_time)
print("done with hello")
def process_message(data):
user_id = data['user']
try:
if data['channel'].startswith('D') and user_id in user_ids:
text = data['text']
users_status = todays_status[user_id]
if text == 'dump':
display_standup()
return
if text not in skip_question:
users_status['answers'].append({
'question': users_status['question'],
'text': text
})
users_status['question'] += 1
request_update(user_id)
except:
e = sys.exc_info()
print("error: {}".format(e))
def process_presence_change(data):
print("process_presence_change {}".format(data))
if data['presence'] == 'active':
if should_request_update(data['user'], active_time):
username = get_username(data['user'])
print("{} Now Active! {}".format(username, active_time))
request_update(data['user'])
def request_update(id):
question_number = todays_status[id]['question']
print("question_number: {}".format(question_number))
if question_number >= len(questions):
return
msg_user(id, questions[question_number].get('text'))
def should_request_update(id, time):
if not get_updates or id not in user_ids:
return False
return True
def msg_user(id, msg):
response = sc.api_call('im.open', user=id)
channel = response['channel']
print("channel: {}".format(channel))
outputs.append([channel['id'], msg])
def catch_all(data):
print(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment