Created
January 20, 2019 19:21
-
-
Save ryanlovett/3d5a7e2dadd2f02353ed38a9411a368d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# vim: set et sw=4 ts=4: | |
# Given a listen of courses, where each has a term, department, & course number: | |
# - fetch the course roster from sis | |
# - replace members of the calgroup roster with those from the sis | |
# Requires SIS and CalGroups API secrets. | |
# CalGroups API | |
# https://calnetweb.berkeley.edu/calnet-technologists/calgroups-integration/calgroups-api-information | |
import argparse | |
import json | |
import os | |
import sys | |
import requests | |
# Various SIS endpoints | |
enrollments_uri = "https://apis.berkeley.edu/sis/v2/enrollments" | |
descriptors_uri = enrollments_uri + '/terms/{}/classes/sections/descriptors' | |
sections_uri = enrollments_uri + "/terms/{}/classes/sections/{}" | |
classes_sections_uri = "https://apis.berkeley.edu/sis/v1/classes/sections" | |
def sis_filter_lectures(sections): | |
'''Given a list of SIS sections, return only those which are lectures.''' | |
return list( | |
map(lambda x: x['code'], | |
filter(lambda x: ' LEC ' in x['description'] or | |
' SES ' in x['description'] or | |
' WBL ' in x['description'], | |
sections) | |
) | |
) | |
def sis_get_items(uri, params, headers, item_type): | |
'''Recursively get a list of items (enrollments, ) from the SIS.''' | |
if args.debug: | |
print("sis_get_items: {}".format(uri)) | |
print(" params: {}".format(params)) | |
print(" headers: {}".format(headers)) | |
r = requests.get(uri, params=params, headers=headers) | |
if r.status_code == 404: | |
if args.debug: print('NO MORE {}'.format(item_type)) | |
return [] | |
else: | |
if args.debug: print('FOUND {}'.format(item_type)) | |
data = r.json() | |
# Return if there is no response (e.g. 404) | |
if 'response' not in data['apiResponse']: | |
if args.debug: print('404 No response') | |
return[] | |
# Return if the UID has no items | |
elif item_type not in data['apiResponse']['response']: | |
if args.debug: print('No {}'.format(item_type)) | |
return [] | |
# Get this page's items | |
items = data['apiResponse']['response'][item_type] | |
# Get the next page's items | |
params['page-number'] += 1 | |
items += sis_get_items(uri, params, headers, item_type) | |
if args.debug: | |
print('num {}: {}'.format(item_type, len(items))) | |
print(items) | |
return items | |
def sis_get_lecture_section_ids(e_id, e_key, term_id, subject_area, catalog_number): | |
''' | |
Given a term, subject, and course number, return the lecture section ids. | |
We only care about the lecture enrollments since they contain a superset | |
of the enrollments of all other section types (lab, dis). | |
''' | |
headers = { "Accept": "application/json", "app_id": e_id, "app_key": e_key } | |
params = { | |
'page-number': 1, | |
"subject-area-code": subject_area, | |
"catalog-number": catalog_number, | |
} | |
# Retrieve the sections associated with the course which includes | |
# both lecture and sections. | |
uri = descriptors_uri.format(term_id) | |
sections = sis_get_items(uri, params, headers, 'fieldValues') | |
return sis_filter_lectures(sections) | |
def sis_get_enrollments(e_id, e_key, term_id, subject_area, catalog_number): | |
if args.debug: print("sis_get_enrollments: {}".format(catalog_number)) | |
'''Gets a course's enrollments from the SIS.''' | |
# get the lectures | |
lecture_codes = sis_get_lecture_section_ids(e_id, e_key, term_id, | |
subject_area, catalog_number) | |
headers = { "Accept": "application/json", "app_id": e_id, "app_key": e_key } | |
params = { | |
"page-number": 1, | |
"page-size": 100, | |
} | |
# get the enrollments in each lecture | |
enrollments = [] | |
for lecture_code in lecture_codes: | |
uri = sections_uri.format(term_id, lecture_code) | |
enrollments += sis_get_items(uri, params, headers, | |
'classSectionEnrollments') | |
if args.verbose: print('{} {}'.format(catalog_number, len(enrollments))) | |
return enrollments | |
def sis_filter_section_instructors(section): | |
'''Extract the campus-uid of instructors from a section.''' | |
uids = set() | |
if 'meetings' not in section: uids | |
meetings = section['meetings'] | |
for meeting in meetings: | |
if 'assignedInstructors' not in meeting: continue | |
instructors = meeting['assignedInstructors'] | |
for instructor in instructors: | |
if 'identifiers' not in instructor['instructor']: continue | |
identifiers = instructor['instructor']['identifiers'] | |
for identifier in identifiers: | |
# {'disclose': True, 'id': '1234', 'type': 'campus-uid'} | |
if 'disclose' not in identifier: continue | |
if not identifier['disclose']: continue | |
if identifier['type'] != 'campus-uid': continue | |
uids.add(identifier['id']) | |
return uids | |
def sis_get_instructors_gsis(c_id, c_key, term_id, subject_area, catalog_number): | |
'''Given a term, subject, and SIS catalog number, returns a list of | |
instructors and a list of GSIs.''' | |
headers = { | |
"Accept": "application/json", | |
"app_id": c_id, | |
"app_key": c_key | |
} | |
params = { | |
"subject-area-code": subject_area, | |
"catalog-number": catalog_number, | |
"term-id": term_id, | |
"page-size": 400, | |
"page-number": 1 | |
} | |
instructors = set(); gsis = set() | |
# Retrieve the sections associated with the course which includes | |
# both lecture and sections. | |
sections = sis_get_items(classes_sections_uri, params, headers, | |
'classSections') | |
for section in sections: | |
uids = sis_filter_section_instructors(section) | |
if section['association']['primary']: | |
instructors.update(uids) | |
else: | |
gsis.update(uids) | |
return (list(instructors), list(gsis)) | |
def grouper_replace_users(auth, group, users): | |
'''Replace the members of the grouper group {group} with {users}.''' | |
if args.verbose: print('transferring to {}'.format(group)) | |
# https://github.com/Internet2/grouper/blob/master/grouper-ws/grouper-ws/doc/samples/addMember/WsSampleAddMemberRest_json.txt | |
data = { | |
"WsRestAddMemberRequest": { | |
"replaceAllExisting":"T", | |
"subjectLookups":[] | |
} | |
} | |
for user in users: | |
data['WsRestAddMemberRequest']['subjectLookups'].append( | |
{"subjectId":user} | |
) | |
r = requests.put( | |
grouper_base_uri + '/groups/{group}/members'.format(group=group), | |
data=json.dumps(data), auth=auth, headers={'Content-type':'text/x-json'} | |
) | |
out = r.json() | |
if 'WsRestResultProblem' in out: | |
msg = out['WsRestResultProblem']['resultMetadata']['resultMessage'] | |
raise Exception(msg) | |
def valid_class(c): | |
return ( | |
'term_id' in c and | |
'subject_area' in c and | |
'catalog_number' in c | |
) | |
def get_enrollment_uids(enrollments): | |
'''Given an SIS enrollment, return the student's campus UID.''' | |
def campus_uid(enrollment): | |
for identifier in enrollment['student']['identifiers']: | |
if identifier['type'] == 'campus-uid': | |
return identifier['id'] | |
return list(map(lambda x: campus_uid(x), enrollments)) | |
def get_enrollment_emails(enrollments): | |
'''Given an SIS enrollment, return the student's campus email.''' | |
def campus_email(enrollment): | |
emails = {} | |
for email in enrollment['student'].get('emails', []): | |
if email['type']['code'] == 'CAMP': return email['emailAddress'] | |
return None | |
return list(map(lambda x: campus_email(x), enrollments)) | |
def enrollment_status(enrollment): | |
'''Returns 'E', 'W', or 'D'.''' | |
return str(enrollment['enrollmentStatus']['status']['code']) | |
def filter_enrollment_status(enrollments, status): | |
return list(filter(lambda x: enrollment_status(x) == status, enrollments)) | |
def calgroup_child_id(calgroup, child): | |
'''Given a:base:group for a course, return a:base:group:group-child.''' | |
els = calgroup.split(':') | |
return ':'.join(els + [els[-1] + '-' + str(child)]) | |
def has_all_keys(d, keys): | |
return all (k in d for k in keys) | |
def read_json_data(filename, required_keys): | |
'''Read and validate data from a json file.''' | |
if not os.path.exists(filename): | |
s = "No such file: {}".format(filename) | |
raise Exception(s) | |
data = json.loads(open(filename).read()) | |
# check that we've got all of our required keys | |
if not has_all_keys(data, required_keys): | |
s = "Missing parameters in {}: {}".format( | |
filename, | |
set(required_keys) - set(data.keys()) | |
) | |
raise Exception(s) | |
return data | |
def read_secrets(filename): | |
required_keys = ['sis_enrollments_id', 'sis_enrollments_key', | |
'sis_classes_id', 'sis_classes_key', 'grouper_user', 'grouper_pass'] | |
return read_json_data(filename, required_keys) | |
def read_config(filename): | |
# read config from config file | |
required_keys = ['base_group', 'courses'] | |
return read_json_data(filename, required_keys) | |
## main | |
grouper_base_uri = 'https://calgroups.berkeley.edu/gws/servicesRest/json/v2_2_100' | |
parser = argparse.ArgumentParser(description="Sync SIS classes to CalGroups.") | |
parser.add_argument('-c', dest='config', | |
default='/etc/sis-to-calgroups.json', help='Configuration file.') | |
parser.add_argument('-s', dest='secrets', | |
default='/root/.sis-to-calgroups.json', help='Secrets file.') | |
parser.add_argument('-v', dest='verbose', action='store_true', | |
help='Be verbose.') | |
parser.add_argument('-d', dest='debug', action='store_true', | |
help='Debug.') | |
parser.add_argument('-n', dest='dryrun', action='store_true', | |
help='Dry run. Print enrollments without updating CalGroups.') | |
args = parser.parse_args() | |
# read secrets from secrets file | |
secrets = read_secrets(args.secrets) | |
# read config from config file | |
config = read_config(args.config) | |
for c in config['courses']: | |
if not valid_class(c): | |
print('Incomplete course parameters: {}'.format(c)) | |
continue | |
# store the uids for our various groups | |
uids = {} | |
# fetch student enrollments | |
enrollments = sis_get_enrollments( | |
secrets['sis_enrollments_id'], secrets['sis_enrollments_key'], | |
c['term_id'], c['subject_area'], c['catalog_number'] | |
) | |
# filter student uids | |
uids['enrolled'] = get_enrollment_uids( | |
filter_enrollment_status(enrollments, 'E')) | |
uids['waitlisted'] = get_enrollment_uids( | |
filter_enrollment_status(enrollments, 'W')) | |
uids['dropped'] = get_enrollment_uids( | |
filter_enrollment_status(enrollments, 'D')) | |
# fetch instructor and gsi uids | |
uids['instructors'], uids['gsis'] = sis_get_instructors_gsis( | |
secrets['sis_classes_id'], secrets['sis_classes_key'], | |
c['term_id'], c['subject_area'], c['catalog_number']) | |
# term ~ edu:berkeley:org:stat:stat-classes:stat-classes-2188 | |
term_group = calgroup_child_id(config['base_group'], c['term_id']) | |
# course ~ {term}:stat-classes-2188-stat-243 | |
course_group = calgroup_child_id(term_group, | |
c['subject_area'].lower() + '-' + c['catalog_number']) | |
if args.dryrun: | |
print(course_group) | |
print('_instructors') | |
for uid in uids['instructors']: print(uid) | |
print('_gsis') | |
for uid in uids['gsis']: print(uid) | |
print('_enrolled') | |
for email in get_enrollment_emails(enrolled): print(email) | |
print('_waitlisted') | |
for email in get_enrollment_emails(waitlisted): print(email) | |
print() | |
else: | |
if args.verbose: print(course_group) | |
grouper_auth = requests.auth.HTTPBasicAuth( | |
secrets['grouper_user'], secrets['grouper_pass'] | |
) | |
for suffix in ['enrolled', 'waitlisted', 'instructors', 'gsis']: | |
grouper_replace_users( | |
grouper_auth, | |
calgroup_child_id(course_group, suffix), | |
uids[suffix] | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Some exerpted docs:
CalGroups from SIS Rosters
Manually create the following structure in Berkeley:Organizations:Stat:Classes. Eventually this will be done by sis-to-calgroups.py.
Name: TERM ; ID: stat-classes-TERM ; E.g.: TERM=2188
Name: COURSE ; ID: stat-classes-TERM-COURSE ; E.g.: COURSE=stat-243
Name: enrolled ; ID: stat-classes-TERM-COURSE-enrolled
Name: waitlisted ; ID: stat-classes-TERM-COURSE-waitlisted
Name: gsis ; ID: stat-classes-TERM-COURSE-gsis
Name: instructors ; ID: stat-classes-TERM-COURSE-instructors
Name: non-enrolled ; ID: stat-classes-TERM-COURSE-non-enrolled
Members are manually added.
This group is intended to hold auditors, guests, observers, etc.
Name: admins ; ID: stat-classes-TERM-COURSE-admins
Members are manually added.
This group is intended to hold group technical contacts, faculty assistants, departmental administrators, etc.
Name: FRIENDLY_NAME ; ID: stat-classes-TERM-COURSE-all ; E.g.: FRIENDLY_NAME="Stat 243 Fall 2018"
Members are enrolled, gsis, instructors, admins, and non-enrolled.
This group is provisioned to Google Groups. This group's name is a friendly name, unlike the others, because Google Groups uses it within the web UI. Something terse like "all-members" would be ambiguous because one student may be in more than one of these courses.
The direct members "instructors" and "admins" should be give admin privileges of this group. This is currently done manually by clicking on Actions next to "instructors" and "admins" when viewing the direct members of this group.