Last active
March 29, 2021 20:53
-
-
Save lukwam/77766e8cdb75512e4acabfb28bf3dc7f to your computer and use it in GitHub Desktop.
Workday transform main.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Helpers for Workday People Transform.""" | |
import datetime | |
import json | |
import logging | |
import os | |
import re | |
from unidecode import unidecode | |
from google.oauth2 import service_account | |
from bits.google import Google | |
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR) | |
BITSDB_PROJECT = os.environ.get("BITSDB_PROJECT") | |
FIRESTORE_PROJECT = os.environ.get("FIRESTORE_PROJECT") | |
GCP_PROJECT = os.environ.get("GCP_PROJECT") | |
GITHUB_PROJECT = os.environ.get("GITHUB_PROJECT") | |
def _dict_to_list(data): | |
"""Convert a dict of data to a list.""" | |
items = [] | |
for key in data: | |
items.append(data[key]) | |
return items | |
def _docs_to_dict(docs): | |
"""Convert a dict of docs to a dict of dicts.""" | |
data = {} | |
for key in docs: | |
data[key] = docs[key].to_dict() | |
return data | |
def _get_config(): | |
"""Return settings.""" | |
return Google().secretmanager().access_version_value( | |
GCP_PROJECT, | |
"google-service-account-key" | |
) | |
def _get_params(data): | |
"""Return the json body from the pubsub message.""" | |
g = Google() | |
pubsub = g.pubsub() | |
params = pubsub.get_pubsub_message_json_data(data) | |
return params | |
def _get_next_person_id(pids): | |
"""Get next available person_id.""" | |
last_pid = pids[-1] | |
next_pid = u'%s' % (int(last_pid) + 1) | |
pids.append(next_pid) | |
return next_pid | |
def _to_ascii(value, key=None): | |
"""Convert a value from utf-8 to ascii.""" | |
if not value: | |
return value | |
newvalue = unidecode(value) | |
# if key and value != newvalue: | |
# print('Fixed {}: {} -> {}'.format(key, value, newvalue)) | |
return newvalue | |
def get_ccure_cards(): | |
"""Return a dict of ccure cards by emplid.""" | |
g = Google() | |
firestore = g.firestore(FIRESTORE_PROJECT) | |
print('Getting CCURE Personnel from Firestore...') | |
personnel = _docs_to_dict(firestore.get_docs_dict('ccure_personnel')) | |
print('Found %s CCURE Personnel.' % (len(personnel))) | |
print('Getting CCURE Credentials from Firestore...') | |
credentials = _docs_to_dict(firestore.get_docs_dict('ccure_credentials')) | |
print('Found %s CCURE Credentials.' % (len(credentials))) | |
cards = {} | |
for cid in credentials: | |
c = credentials[cid] | |
pid = str(c['personnel_id']) | |
# get card number | |
num = c['card_number'] | |
if not num: | |
continue | |
# skip disabled/lost/stolen cards | |
if c['disabled'] or c['lost'] or c['stolen']: | |
continue | |
# get personnel record | |
if pid not in personnel: | |
print('Personnel record not found: %s' % (pid)) | |
continue | |
p = personnel[pid] | |
# get emplid | |
emplid = p.get('emplid') | |
if not emplid: | |
continue | |
emplid = str(emplid) | |
# add to cards data | |
if emplid not in cards: | |
cards[emplid] = [num] | |
else: | |
cards[emplid].append(num) | |
return cards | |
def get_desks(): | |
"""Return a dict of Desks by person id.""" | |
collection = 'seats' | |
g = Google() | |
firestore = g.firestore(BITSDB_PROJECT) | |
# get seats from firetore | |
print('Getting Space Seats from Firestore...') | |
seats = _docs_to_dict(firestore.get_docs_dict(collection)) | |
print('Found %s Seats.' % (len(seats))) | |
desks = {} | |
for sid in seats: | |
s = seats[sid] | |
desk = { | |
"name": s['name'], | |
"address": { | |
"street": f'{s["building"]["street_address"]}, {s["name"]}', | |
"city": s["building"]["city"], | |
"state": s["building"]["state"], | |
"zip_code": s["building"]["zip"], | |
"country": "US", | |
} | |
} | |
pid = u'%s' % (s['pid']) | |
if pid not in desks: | |
desks[pid] = [] | |
desks[pid].append(desk) | |
return desks | |
def get_github_users(): | |
"""Return a dict of GitHub Users by Broad username.""" | |
collection = 'tokens' | |
g = Google() | |
firestore = g.firestore(GITHUB_PROJECT) | |
# get github users from firestore | |
print('Getting GitHub Users from Firestore...') | |
docs = firestore.get_docs(collection) | |
print('Found %s GitHub Users.' % (len(docs))) | |
github_users = {} | |
for doc in docs: | |
google_id = doc.id | |
token = doc.to_dict() | |
github_users[google_id] = { | |
'github_id': u'%s' % (token['github_id']), | |
'login': u'%s' % (token['github_login']), | |
'google_id': u'%s' % (google_id), | |
} | |
return github_users | |
def get_google_people(): | |
"""Return a dict of Google people.""" | |
collection = 'google_people' | |
g = Google() | |
firestore = g.firestore(FIRESTORE_PROJECT) | |
# get seats from firetore | |
print('Getting Google People from Firestore...') | |
google_people = _docs_to_dict(firestore.get_docs_dict(collection)) | |
print('Found %s Google People.' % (len(google_people))) | |
people = {} | |
for google_id in google_people: | |
person = google_people[google_id] | |
photos = person.get('photos', []) | |
# check photos for profile photo | |
profile = None | |
for photo in photos: | |
if photo['metadata']['source']['type'] == 'PROFILE': | |
profile = photo | |
# if we have a profile, save it as the user's photo | |
if profile: | |
photo_url = profile['url'].replace('/s100/', '/').replace('=s100', '') | |
people[google_id] = {'photo_url': photo_url} | |
return people | |
def get_google_users(): | |
"""Return a dict of Google users.""" | |
service_account_json = json.loads(_get_config()) | |
scopes = [ | |
'https://www.googleapis.com/auth/admin.directory.user.readonly', | |
'https://www.googleapis.com/auth/admin.directory.group.readonly' | |
] | |
g = Google() | |
g.credentials = service_account.Credentials.from_service_account_info( | |
service_account_json, | |
scopes=scopes, | |
subject='google@broadinstitute.com', | |
) | |
fields = 'nextPageToken,users(id,primaryEmail)' | |
print('Getting Google Users from Google...') | |
google_users = g.directory().get_users(fields=fields) | |
print('Found %s Google Users.' % (len(google_users))) | |
users = {} | |
for u in google_users: | |
email = u['primaryEmail'] | |
users[email] = u | |
return users | |
def get_nicknames(): | |
"""Return a dict of Nicknames.""" | |
collection = 'nicknames' | |
g = Google() | |
firestore = g.firestore(BITSDB_PROJECT) | |
# get nicknames from firestore | |
print('Getting Nicknames from Firestore...') | |
nicknames = _docs_to_dict(firestore.get_docs_dict(collection)) | |
print('Found %s Nicknames.' % (len(nicknames))) | |
data = {} | |
for username in nicknames: | |
data[username] = [] | |
nicks = nicknames[username].get('nicknames', []) | |
for nickname in sorted(nicks): | |
data[username].append(u'%s' % (nickname)) | |
return data | |
def get_people(): | |
"""Return a dict of People.""" | |
collection = 'people' | |
g = Google() | |
firestore = g.firestore(BITSDB_PROJECT) | |
# get people from firestore | |
print('Getting People from Firestore...') | |
people = _docs_to_dict(firestore.get_docs_dict(collection)) | |
print('Found %s People.' % (len(people))) | |
# reduce down to just person_id, email and emplid | |
data = {} | |
for pid in people: | |
p = people[pid] | |
emplid = u'%s' % (p['emplid']) | |
data[emplid] = { | |
'person_id': u'%s' % (pid), | |
'email': u'%s' % (p['email']), | |
'emplid': emplid, | |
} | |
return data, sorted(people) | |
def get_phones(): | |
"""Return a dict of Phones by username.""" | |
collection = 'phones' | |
g = Google() | |
firestore = g.firestore(BITSDB_PROJECT) | |
# get phones from firestore | |
print('Getting Phones from Firestore...') | |
extensions = _docs_to_dict(firestore.get_docs_dict(collection)) | |
print('Found %s Phones.' % (len(extensions))) | |
phones = {} | |
for ext in extensions: | |
e = extensions[ext] | |
username = e['username'] | |
if username not in phones: | |
phones[username] = [] | |
phones[username].append(ext) | |
return phones | |
def get_slack_users(): | |
"""Return a dict of Slack users.""" | |
collection = 'slack_users' | |
g = Google() | |
firestore = g.firestore(FIRESTORE_PROJECT) | |
# get seats from firetore | |
print('Getting Slack Users from Firestore...') | |
slack_users = _docs_to_dict(firestore.get_docs_dict(collection)) | |
print('Found %s Slack User.' % (len(slack_users))) | |
users = {} | |
for uid in slack_users: | |
user = slack_users[uid] | |
profile = user['profile'] | |
email = profile.get('email') | |
if email and '@broadinstitute.org' in email: | |
username = email.replace('@broadinstitute.org', '') | |
users[username] = { | |
'slack_id': uid, | |
'slack_name': profile.get('display_name_normalized') | |
} | |
return users | |
def add_ccure_data(workday_people): | |
"""Add CCURE Card Numbers to Workday People records.""" | |
ccure_cards = get_ccure_cards() | |
for emplid in workday_people: | |
p = workday_people[emplid] | |
if emplid in ccure_cards: | |
p['card_numbers'] = ccure_cards[emplid] | |
return workday_people | |
def add_desks(workday_people): | |
"""Add Desks to Workday People records.""" | |
desks = get_desks() | |
for emplid in workday_people: | |
p = workday_people[emplid] | |
pid = p.get('person_id') | |
if pid in desks: | |
p["address"] = desks[pid][0]["address"] | |
p['desk'] = desks[pid][0]["name"] | |
p['desks'] = [x["name"] for x in desks[pid]] | |
else: | |
if 'desk' in p: | |
p['desk'] = None | |
return workday_people | |
def add_emails(workday_people): | |
"""Add Emails to Workday People records.""" | |
for emplid in workday_people: | |
p = workday_people[emplid] | |
email = p.get('email') | |
emails = [] | |
if email: | |
emails.append(email) | |
for key in [ | |
'email_username', | |
'email_work', | |
'nicknames', | |
'email_home', | |
'email_work_referral', | |
]: | |
if key == 'nicknames': | |
for nickname in p.get('nicknames', []): | |
emails.append(u'%s@broadinstitute.org' % (nickname)) | |
else: | |
e = p.get(key) | |
if e and e not in emails: | |
emails.append(u'%s' % (e)) | |
p['emails'] = emails | |
return workday_people | |
def add_github_info(workday_people): | |
"""Add GitHub info to Workday People records.""" | |
github_users = get_github_users() | |
for emplid in workday_people: | |
p = workday_people[emplid] | |
google_id = p.get('google_id') | |
if google_id and google_id in github_users: | |
github_user = github_users[google_id] | |
p['github_id'] = u'%s' % (github_user['github_id']) | |
p['github_login'] = u'%s' % (github_user['login']) | |
return workday_people | |
def add_google_info(workday_people): | |
"""Add Google info to Workday People records.""" | |
google_people = get_google_people() | |
google_users = get_google_users() | |
for emplid in workday_people: | |
p = workday_people[emplid] | |
email = p['email_username'] | |
if email in google_users: | |
# add google_id | |
google_id = google_users[email]['id'] | |
p['google_id'] = google_id | |
# add photo_url | |
if google_id in google_people: | |
p['photo_url'] = google_people[google_id].get('photo_url') | |
return workday_people | |
def add_slack_info(workday_people): | |
"""Add Google info to Workday People records.""" | |
slack_users = get_slack_users() | |
for emplid in workday_people: | |
p = workday_people[emplid] | |
username = p['username'] | |
if username in slack_users: | |
slack_user = slack_users[username] | |
p['slack_id'] = slack_user['slack_id'] | |
p['slack_name'] = slack_user['slack_name'] | |
return workday_people | |
def add_nicknames(workday_people): | |
"""Add Nicknames to Workday People records.""" | |
nicknames = get_nicknames() | |
for emplid in workday_people: | |
p = workday_people[emplid] | |
username = p['username'] | |
if username in nicknames: | |
p['nicknames'] = nicknames[username] | |
return workday_people | |
def add_people_data(workday_people): | |
"""Add People data to Workday People records.""" | |
people, pids = get_people() | |
for emplid in workday_people: | |
p = workday_people[emplid] | |
if emplid not in people: | |
# New person, generate the next available Person ID | |
p['email'] = '%s@broadinstitute.org' % (p['username']) | |
p['person_id'] = _get_next_person_id(pids) | |
print('New Person: %s %s [%s] -> %s' % ( | |
emplid, | |
p['full_name'], | |
p['username'], | |
p['person_id'], | |
)) | |
else: | |
person = people[emplid] | |
p['email'] = person['email'] | |
p['person_id'] = person['person_id'] | |
return workday_people | |
def add_phones(workday_people): | |
"""Add Phones data to Workday People records.""" | |
phones = get_phones() | |
for emplid in workday_people: | |
p = workday_people[emplid] | |
username = p['username'] | |
p['primary_work_phone'] = None | |
if username in phones: | |
user_phones = [] | |
for ext in phones[username]: | |
user_phones.append(u'%s' % (ext)) | |
p['primary_work_phone'] = '+1 (617) 714-%s' % (user_phones[0]) | |
p['extensions'] = user_phones | |
else: | |
if 'primary_work_phone' in p: | |
p['primary_work_phone'] = None | |
if 'extensions' in p: | |
p['extensions'] = [] | |
return workday_people | |
def get_workday_people(data): | |
"""Return a transposed version of workday_people.""" | |
g = Google() | |
storage = g.storage() | |
# get parameters from pubsub message | |
params = _get_params(data) | |
# get workday people data from GCS | |
workday_people_list = storage.download_blob_as_json( | |
params['bucket'], | |
params['filename'], | |
) | |
print('Found %s records in Workday People.' % ( | |
len(workday_people_list) | |
)) | |
# convert to dictionary | |
workday_people = transform_workday_people(workday_people_list) | |
# add in ccure data (card_number) | |
add_ccure_data(workday_people) | |
# add in people data (email and person_id) | |
add_people_data(workday_people) | |
# add in nicknames | |
add_nicknames(workday_people) | |
# add in desks | |
add_desks(workday_people) | |
# add in phones | |
add_phones(workday_people) | |
# add in emails | |
add_emails(workday_people) | |
# add in google info | |
add_google_info(workday_people) | |
# add in github info (after google info because relies on google_id) | |
add_github_info(workday_people) | |
# add in slack info | |
add_slack_info(workday_people) | |
return _dict_to_list(workday_people) | |
def transform_workday_people(workday_people_list): | |
"""Transform the data from Workday People for People.""" | |
workday_people = {} | |
for workday_person in workday_people_list: | |
# skip people with create_it_account set to false | |
if not workday_person['create_it_account']: | |
continue | |
emplid = workday_person['emplid'] | |
workday_people[emplid] = transform_workday_person(workday_person) | |
return workday_people | |
def transform_workday_person(workday_person): | |
"""Transform a Workday person for People.""" | |
today = str(datetime.datetime.now().date()) | |
# get attributes | |
first_name = workday_person['preferred_first_name'] | |
last_name = workday_person['preferred_last_name'] | |
start_date = workday_person['it_account_start_date'] | |
end_date = workday_person['it_account_end_date'] | |
# set names | |
workday_person['first_name'] = first_name | |
workday_person['last_name'] = last_name | |
workday_person['full_name'] = u'%s %s' % (first_name, last_name) | |
# start and end date | |
workday_person['future_hire'] = False | |
workday_person['terminated'] = False | |
if start_date: | |
workday_person['start_date'] = start_date | |
if start_date > today: | |
workday_person['future_hire'] = True | |
if end_date: | |
workday_person['end_date'] = end_date | |
if end_date < today: | |
workday_person['terminated'] = True | |
# title | |
workday_person['title'] = workday_person['worker_job_title'] | |
# check department_id | |
department_id = workday_person['department_id'] | |
if department_id and re.search('[^0-9]', department_id): | |
error = 'Invalid department_id: %s' % (department_id) | |
logging.error(error) | |
workday_person['department_id'] = None | |
# convert utf-8 fields to ascii | |
for key in [ | |
'home_institution', | |
'first_name', | |
'full_name', | |
'last_name', | |
'manager', | |
]: | |
new_key = '{}_ascii'.format(key) | |
workday_person[new_key] = _to_ascii(workday_person[key], key) | |
# delete keys that are now redundant | |
for key in [ | |
'create_it_account', | |
'it_account_start_date', | |
'it_account_end_date', | |
'preferred_first_name', | |
'preferred_last_name', | |
'worker_job_title', | |
]: | |
del workday_person[key] | |
return workday_person |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""People Cloud Functions in Python 3.""" | |
import json | |
import logging | |
import os | |
from google.cloud.storage import Client | |
from bits.google import Google | |
from bits.helpers import generate_gcs_object_name | |
import helpers | |
BIGQUERY_BUCKET = os.environ.get("BIGQUERY_BUCKET") | |
FEEDS_BUCKET = os.environ.get("FEEDS_BUCKET") | |
def _save_entries_for_bigquery(entries): | |
"""Save Workday People feed entries to GCS.""" | |
objectName = 'peopleapi/people.json' | |
# create json string | |
output = [] | |
for entry in entries: | |
output.append(json.dumps(entry)) | |
jsonstring = '\n'.join(output) | |
storage_client = Client() | |
# set the bucket | |
bucket = storage_client.bucket(BIGQUERY_BUCKET) | |
# create the blob | |
blob = bucket.blob(objectName) | |
# upload the blob | |
print('Saving entries for bigquery...') | |
blob.upload_from_string(jsonstring, content_type='application/json') | |
print('Saved entries to file: gs://%s/%s' % ( | |
BIGQUERY_BUCKET, | |
objectName, | |
)) | |
return objectName | |
def _save_entries_for_firestore(entries): | |
"""Save Workday People feed entries to GCS.""" | |
g = Google() | |
storage = g.storage() | |
# prepare for gcs | |
dirPath = 'people' | |
prefix = 'people_people' | |
objectName = generate_gcs_object_name(dirPath, prefix) | |
# save to gcs | |
print('Saving entries for firestore...') | |
storage.upload_blob_from_json( | |
FEEDS_BUCKET, | |
objectName, | |
json.dumps(entries), | |
) | |
print('Saved entries to file: gs://%s/%s' % ( | |
FEEDS_BUCKET, | |
objectName, | |
)) | |
return objectName | |
def workday_people_transform(data, context): | |
"""Background Cloud Function to be triggered by Pub/Sub. | |
Args: | |
data (dict): The dictionary with data specific to this type of event. | |
context (google.cloud.functions.Context): The Cloud Functions event | |
metadata. | |
""" | |
workday_people = helpers.get_workday_people(data) | |
print('Transformed %s Workday People for People.' % (len(workday_people))) | |
# save file for bigquery | |
try: | |
filename = _save_entries_for_bigquery(workday_people) | |
print('Saved GCS file for BigQuery: %s' % (filename)) | |
except Exception as e: | |
error = 'ERROR saving entries to file: %s\n%s' % (filename, e) | |
logging.error(error) | |
return error | |
# save file for firestore | |
try: | |
filename = _save_entries_for_firestore(workday_people) | |
print('Saved GCS file for Firestore: %s' % (filename)) | |
except Exception as e: | |
error = 'ERROR saving entries to file: %s\n%s' % (filename, e) | |
logging.error(error) | |
return error | |
return True | |
if __name__ == '__main__': | |
import base64 | |
body = { | |
'bucket': 'broad-bitsdb-feeds', | |
'filename': 'workday/workday_people_2021-02-24T19:11:04.669630.json', | |
} | |
data = { | |
'data': base64.b64encode(json.dumps(body).encode('utf-8')), | |
} | |
workday_people_transform(data, {}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment