Skip to content

Instantly share code, notes, and snippets.

@lukwam
Last active March 29, 2021 20:53
Show Gist options
  • Save lukwam/77766e8cdb75512e4acabfb28bf3dc7f to your computer and use it in GitHub Desktop.
Save lukwam/77766e8cdb75512e4acabfb28bf3dc7f to your computer and use it in GitHub Desktop.
Workday transform main.py
# -*- coding: utf-8 -*-
"""Helpers for Workday People Transform."""
import datetime
import json
import logging
import os
import re
from unidecode import unidecode
from google.oauth2 import service_account
from bits.google import Google
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR)
BITSDB_PROJECT = os.environ.get("BITSDB_PROJECT")
FIRESTORE_PROJECT = os.environ.get("FIRESTORE_PROJECT")
GCP_PROJECT = os.environ.get("GCP_PROJECT")
GITHUB_PROJECT = os.environ.get("GITHUB_PROJECT")
def _dict_to_list(data):
"""Convert a dict of data to a list."""
items = []
for key in data:
items.append(data[key])
return items
def _docs_to_dict(docs):
"""Convert a dict of docs to a dict of dicts."""
data = {}
for key in docs:
data[key] = docs[key].to_dict()
return data
def _get_config():
"""Return settings."""
return Google().secretmanager().access_version_value(
GCP_PROJECT,
"google-service-account-key"
)
def _get_params(data):
"""Return the json body from the pubsub message."""
g = Google()
pubsub = g.pubsub()
params = pubsub.get_pubsub_message_json_data(data)
return params
def _get_next_person_id(pids):
"""Get next available person_id."""
last_pid = pids[-1]
next_pid = u'%s' % (int(last_pid) + 1)
pids.append(next_pid)
return next_pid
def _to_ascii(value, key=None):
"""Convert a value from utf-8 to ascii."""
if not value:
return value
newvalue = unidecode(value)
# if key and value != newvalue:
# print('Fixed {}: {} -> {}'.format(key, value, newvalue))
return newvalue
def get_ccure_cards():
"""Return a dict of ccure cards by emplid."""
g = Google()
firestore = g.firestore(FIRESTORE_PROJECT)
print('Getting CCURE Personnel from Firestore...')
personnel = _docs_to_dict(firestore.get_docs_dict('ccure_personnel'))
print('Found %s CCURE Personnel.' % (len(personnel)))
print('Getting CCURE Credentials from Firestore...')
credentials = _docs_to_dict(firestore.get_docs_dict('ccure_credentials'))
print('Found %s CCURE Credentials.' % (len(credentials)))
cards = {}
for cid in credentials:
c = credentials[cid]
pid = str(c['personnel_id'])
# get card number
num = c['card_number']
if not num:
continue
# skip disabled/lost/stolen cards
if c['disabled'] or c['lost'] or c['stolen']:
continue
# get personnel record
if pid not in personnel:
print('Personnel record not found: %s' % (pid))
continue
p = personnel[pid]
# get emplid
emplid = p.get('emplid')
if not emplid:
continue
emplid = str(emplid)
# add to cards data
if emplid not in cards:
cards[emplid] = [num]
else:
cards[emplid].append(num)
return cards
def get_desks():
"""Return a dict of Desks by person id."""
collection = 'seats'
g = Google()
firestore = g.firestore(BITSDB_PROJECT)
# get seats from firetore
print('Getting Space Seats from Firestore...')
seats = _docs_to_dict(firestore.get_docs_dict(collection))
print('Found %s Seats.' % (len(seats)))
desks = {}
for sid in seats:
s = seats[sid]
desk = {
"name": s['name'],
"address": {
"street": f'{s["building"]["street_address"]}, {s["name"]}',
"city": s["building"]["city"],
"state": s["building"]["state"],
"zip_code": s["building"]["zip"],
"country": "US",
}
}
pid = u'%s' % (s['pid'])
if pid not in desks:
desks[pid] = []
desks[pid].append(desk)
return desks
def get_github_users():
"""Return a dict of GitHub Users by Broad username."""
collection = 'tokens'
g = Google()
firestore = g.firestore(GITHUB_PROJECT)
# get github users from firestore
print('Getting GitHub Users from Firestore...')
docs = firestore.get_docs(collection)
print('Found %s GitHub Users.' % (len(docs)))
github_users = {}
for doc in docs:
google_id = doc.id
token = doc.to_dict()
github_users[google_id] = {
'github_id': u'%s' % (token['github_id']),
'login': u'%s' % (token['github_login']),
'google_id': u'%s' % (google_id),
}
return github_users
def get_google_people():
"""Return a dict of Google people."""
collection = 'google_people'
g = Google()
firestore = g.firestore(FIRESTORE_PROJECT)
# get seats from firetore
print('Getting Google People from Firestore...')
google_people = _docs_to_dict(firestore.get_docs_dict(collection))
print('Found %s Google People.' % (len(google_people)))
people = {}
for google_id in google_people:
person = google_people[google_id]
photos = person.get('photos', [])
# check photos for profile photo
profile = None
for photo in photos:
if photo['metadata']['source']['type'] == 'PROFILE':
profile = photo
# if we have a profile, save it as the user's photo
if profile:
photo_url = profile['url'].replace('/s100/', '/').replace('=s100', '')
people[google_id] = {'photo_url': photo_url}
return people
def get_google_users():
"""Return a dict of Google users."""
service_account_json = json.loads(_get_config())
scopes = [
'https://www.googleapis.com/auth/admin.directory.user.readonly',
'https://www.googleapis.com/auth/admin.directory.group.readonly'
]
g = Google()
g.credentials = service_account.Credentials.from_service_account_info(
service_account_json,
scopes=scopes,
subject='google@broadinstitute.com',
)
fields = 'nextPageToken,users(id,primaryEmail)'
print('Getting Google Users from Google...')
google_users = g.directory().get_users(fields=fields)
print('Found %s Google Users.' % (len(google_users)))
users = {}
for u in google_users:
email = u['primaryEmail']
users[email] = u
return users
def get_nicknames():
"""Return a dict of Nicknames."""
collection = 'nicknames'
g = Google()
firestore = g.firestore(BITSDB_PROJECT)
# get nicknames from firestore
print('Getting Nicknames from Firestore...')
nicknames = _docs_to_dict(firestore.get_docs_dict(collection))
print('Found %s Nicknames.' % (len(nicknames)))
data = {}
for username in nicknames:
data[username] = []
nicks = nicknames[username].get('nicknames', [])
for nickname in sorted(nicks):
data[username].append(u'%s' % (nickname))
return data
def get_people():
"""Return a dict of People."""
collection = 'people'
g = Google()
firestore = g.firestore(BITSDB_PROJECT)
# get people from firestore
print('Getting People from Firestore...')
people = _docs_to_dict(firestore.get_docs_dict(collection))
print('Found %s People.' % (len(people)))
# reduce down to just person_id, email and emplid
data = {}
for pid in people:
p = people[pid]
emplid = u'%s' % (p['emplid'])
data[emplid] = {
'person_id': u'%s' % (pid),
'email': u'%s' % (p['email']),
'emplid': emplid,
}
return data, sorted(people)
def get_phones():
"""Return a dict of Phones by username."""
collection = 'phones'
g = Google()
firestore = g.firestore(BITSDB_PROJECT)
# get phones from firestore
print('Getting Phones from Firestore...')
extensions = _docs_to_dict(firestore.get_docs_dict(collection))
print('Found %s Phones.' % (len(extensions)))
phones = {}
for ext in extensions:
e = extensions[ext]
username = e['username']
if username not in phones:
phones[username] = []
phones[username].append(ext)
return phones
def get_slack_users():
"""Return a dict of Slack users."""
collection = 'slack_users'
g = Google()
firestore = g.firestore(FIRESTORE_PROJECT)
# get seats from firetore
print('Getting Slack Users from Firestore...')
slack_users = _docs_to_dict(firestore.get_docs_dict(collection))
print('Found %s Slack User.' % (len(slack_users)))
users = {}
for uid in slack_users:
user = slack_users[uid]
profile = user['profile']
email = profile.get('email')
if email and '@broadinstitute.org' in email:
username = email.replace('@broadinstitute.org', '')
users[username] = {
'slack_id': uid,
'slack_name': profile.get('display_name_normalized')
}
return users
def add_ccure_data(workday_people):
"""Add CCURE Card Numbers to Workday People records."""
ccure_cards = get_ccure_cards()
for emplid in workday_people:
p = workday_people[emplid]
if emplid in ccure_cards:
p['card_numbers'] = ccure_cards[emplid]
return workday_people
def add_desks(workday_people):
"""Add Desks to Workday People records."""
desks = get_desks()
for emplid in workday_people:
p = workday_people[emplid]
pid = p.get('person_id')
if pid in desks:
p["address"] = desks[pid][0]["address"]
p['desk'] = desks[pid][0]["name"]
p['desks'] = [x["name"] for x in desks[pid]]
else:
if 'desk' in p:
p['desk'] = None
return workday_people
def add_emails(workday_people):
"""Add Emails to Workday People records."""
for emplid in workday_people:
p = workday_people[emplid]
email = p.get('email')
emails = []
if email:
emails.append(email)
for key in [
'email_username',
'email_work',
'nicknames',
'email_home',
'email_work_referral',
]:
if key == 'nicknames':
for nickname in p.get('nicknames', []):
emails.append(u'%s@broadinstitute.org' % (nickname))
else:
e = p.get(key)
if e and e not in emails:
emails.append(u'%s' % (e))
p['emails'] = emails
return workday_people
def add_github_info(workday_people):
"""Add GitHub info to Workday People records."""
github_users = get_github_users()
for emplid in workday_people:
p = workday_people[emplid]
google_id = p.get('google_id')
if google_id and google_id in github_users:
github_user = github_users[google_id]
p['github_id'] = u'%s' % (github_user['github_id'])
p['github_login'] = u'%s' % (github_user['login'])
return workday_people
def add_google_info(workday_people):
"""Add Google info to Workday People records."""
google_people = get_google_people()
google_users = get_google_users()
for emplid in workday_people:
p = workday_people[emplid]
email = p['email_username']
if email in google_users:
# add google_id
google_id = google_users[email]['id']
p['google_id'] = google_id
# add photo_url
if google_id in google_people:
p['photo_url'] = google_people[google_id].get('photo_url')
return workday_people
def add_slack_info(workday_people):
"""Add Google info to Workday People records."""
slack_users = get_slack_users()
for emplid in workday_people:
p = workday_people[emplid]
username = p['username']
if username in slack_users:
slack_user = slack_users[username]
p['slack_id'] = slack_user['slack_id']
p['slack_name'] = slack_user['slack_name']
return workday_people
def add_nicknames(workday_people):
"""Add Nicknames to Workday People records."""
nicknames = get_nicknames()
for emplid in workday_people:
p = workday_people[emplid]
username = p['username']
if username in nicknames:
p['nicknames'] = nicknames[username]
return workday_people
def add_people_data(workday_people):
"""Add People data to Workday People records."""
people, pids = get_people()
for emplid in workday_people:
p = workday_people[emplid]
if emplid not in people:
# New person, generate the next available Person ID
p['email'] = '%s@broadinstitute.org' % (p['username'])
p['person_id'] = _get_next_person_id(pids)
print('New Person: %s %s [%s] -> %s' % (
emplid,
p['full_name'],
p['username'],
p['person_id'],
))
else:
person = people[emplid]
p['email'] = person['email']
p['person_id'] = person['person_id']
return workday_people
def add_phones(workday_people):
"""Add Phones data to Workday People records."""
phones = get_phones()
for emplid in workday_people:
p = workday_people[emplid]
username = p['username']
p['primary_work_phone'] = None
if username in phones:
user_phones = []
for ext in phones[username]:
user_phones.append(u'%s' % (ext))
p['primary_work_phone'] = '+1 (617) 714-%s' % (user_phones[0])
p['extensions'] = user_phones
else:
if 'primary_work_phone' in p:
p['primary_work_phone'] = None
if 'extensions' in p:
p['extensions'] = []
return workday_people
def get_workday_people(data):
"""Return a transposed version of workday_people."""
g = Google()
storage = g.storage()
# get parameters from pubsub message
params = _get_params(data)
# get workday people data from GCS
workday_people_list = storage.download_blob_as_json(
params['bucket'],
params['filename'],
)
print('Found %s records in Workday People.' % (
len(workday_people_list)
))
# convert to dictionary
workday_people = transform_workday_people(workday_people_list)
# add in ccure data (card_number)
add_ccure_data(workday_people)
# add in people data (email and person_id)
add_people_data(workday_people)
# add in nicknames
add_nicknames(workday_people)
# add in desks
add_desks(workday_people)
# add in phones
add_phones(workday_people)
# add in emails
add_emails(workday_people)
# add in google info
add_google_info(workday_people)
# add in github info (after google info because relies on google_id)
add_github_info(workday_people)
# add in slack info
add_slack_info(workday_people)
return _dict_to_list(workday_people)
def transform_workday_people(workday_people_list):
"""Transform the data from Workday People for People."""
workday_people = {}
for workday_person in workday_people_list:
# skip people with create_it_account set to false
if not workday_person['create_it_account']:
continue
emplid = workday_person['emplid']
workday_people[emplid] = transform_workday_person(workday_person)
return workday_people
def transform_workday_person(workday_person):
"""Transform a Workday person for People."""
today = str(datetime.datetime.now().date())
# get attributes
first_name = workday_person['preferred_first_name']
last_name = workday_person['preferred_last_name']
start_date = workday_person['it_account_start_date']
end_date = workday_person['it_account_end_date']
# set names
workday_person['first_name'] = first_name
workday_person['last_name'] = last_name
workday_person['full_name'] = u'%s %s' % (first_name, last_name)
# start and end date
workday_person['future_hire'] = False
workday_person['terminated'] = False
if start_date:
workday_person['start_date'] = start_date
if start_date > today:
workday_person['future_hire'] = True
if end_date:
workday_person['end_date'] = end_date
if end_date < today:
workday_person['terminated'] = True
# title
workday_person['title'] = workday_person['worker_job_title']
# check department_id
department_id = workday_person['department_id']
if department_id and re.search('[^0-9]', department_id):
error = 'Invalid department_id: %s' % (department_id)
logging.error(error)
workday_person['department_id'] = None
# convert utf-8 fields to ascii
for key in [
'home_institution',
'first_name',
'full_name',
'last_name',
'manager',
]:
new_key = '{}_ascii'.format(key)
workday_person[new_key] = _to_ascii(workday_person[key], key)
# delete keys that are now redundant
for key in [
'create_it_account',
'it_account_start_date',
'it_account_end_date',
'preferred_first_name',
'preferred_last_name',
'worker_job_title',
]:
del workday_person[key]
return workday_person
# -*- coding: utf-8 -*-
"""People Cloud Functions in Python 3."""
import json
import logging
import os
from google.cloud.storage import Client
from bits.google import Google
from bits.helpers import generate_gcs_object_name
import helpers
BIGQUERY_BUCKET = os.environ.get("BIGQUERY_BUCKET")
FEEDS_BUCKET = os.environ.get("FEEDS_BUCKET")
def _save_entries_for_bigquery(entries):
"""Save Workday People feed entries to GCS."""
objectName = 'peopleapi/people.json'
# create json string
output = []
for entry in entries:
output.append(json.dumps(entry))
jsonstring = '\n'.join(output)
storage_client = Client()
# set the bucket
bucket = storage_client.bucket(BIGQUERY_BUCKET)
# create the blob
blob = bucket.blob(objectName)
# upload the blob
print('Saving entries for bigquery...')
blob.upload_from_string(jsonstring, content_type='application/json')
print('Saved entries to file: gs://%s/%s' % (
BIGQUERY_BUCKET,
objectName,
))
return objectName
def _save_entries_for_firestore(entries):
"""Save Workday People feed entries to GCS."""
g = Google()
storage = g.storage()
# prepare for gcs
dirPath = 'people'
prefix = 'people_people'
objectName = generate_gcs_object_name(dirPath, prefix)
# save to gcs
print('Saving entries for firestore...')
storage.upload_blob_from_json(
FEEDS_BUCKET,
objectName,
json.dumps(entries),
)
print('Saved entries to file: gs://%s/%s' % (
FEEDS_BUCKET,
objectName,
))
return objectName
def workday_people_transform(data, context):
"""Background Cloud Function to be triggered by Pub/Sub.
Args:
data (dict): The dictionary with data specific to this type of event.
context (google.cloud.functions.Context): The Cloud Functions event
metadata.
"""
workday_people = helpers.get_workday_people(data)
print('Transformed %s Workday People for People.' % (len(workday_people)))
# save file for bigquery
try:
filename = _save_entries_for_bigquery(workday_people)
print('Saved GCS file for BigQuery: %s' % (filename))
except Exception as e:
error = 'ERROR saving entries to file: %s\n%s' % (filename, e)
logging.error(error)
return error
# save file for firestore
try:
filename = _save_entries_for_firestore(workday_people)
print('Saved GCS file for Firestore: %s' % (filename))
except Exception as e:
error = 'ERROR saving entries to file: %s\n%s' % (filename, e)
logging.error(error)
return error
return True
if __name__ == '__main__':
import base64
body = {
'bucket': 'broad-bitsdb-feeds',
'filename': 'workday/workday_people_2021-02-24T19:11:04.669630.json',
}
data = {
'data': base64.b64encode(json.dumps(body).encode('utf-8')),
}
workday_people_transform(data, {})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment