Skip to content

Instantly share code, notes, and snippets.

@ryanlovett
Created January 20, 2019 19:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryanlovett/3d5a7e2dadd2f02353ed38a9411a368d to your computer and use it in GitHub Desktop.
Save ryanlovett/3d5a7e2dadd2f02353ed38a9411a368d to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# vim: set et sw=4 ts=4:
# Given a listen of courses, where each has a term, department, & course number:
# - fetch the course roster from sis
# - replace members of the calgroup roster with those from the sis
# Requires SIS and CalGroups API secrets.
# CalGroups API
# https://calnetweb.berkeley.edu/calnet-technologists/calgroups-integration/calgroups-api-information
import argparse
import json
import os
import sys
import requests
# Various SIS endpoints
enrollments_uri = "https://apis.berkeley.edu/sis/v2/enrollments"
descriptors_uri = enrollments_uri + '/terms/{}/classes/sections/descriptors'
sections_uri = enrollments_uri + "/terms/{}/classes/sections/{}"
classes_sections_uri = "https://apis.berkeley.edu/sis/v1/classes/sections"
def sis_filter_lectures(sections):
'''Given a list of SIS sections, return only those which are lectures.'''
return list(
map(lambda x: x['code'],
filter(lambda x: ' LEC ' in x['description'] or
' SES ' in x['description'] or
' WBL ' in x['description'],
sections)
)
)
def sis_get_items(uri, params, headers, item_type):
'''Recursively get a list of items (enrollments, ) from the SIS.'''
if args.debug:
print("sis_get_items: {}".format(uri))
print(" params: {}".format(params))
print(" headers: {}".format(headers))
r = requests.get(uri, params=params, headers=headers)
if r.status_code == 404:
if args.debug: print('NO MORE {}'.format(item_type))
return []
else:
if args.debug: print('FOUND {}'.format(item_type))
data = r.json()
# Return if there is no response (e.g. 404)
if 'response' not in data['apiResponse']:
if args.debug: print('404 No response')
return[]
# Return if the UID has no items
elif item_type not in data['apiResponse']['response']:
if args.debug: print('No {}'.format(item_type))
return []
# Get this page's items
items = data['apiResponse']['response'][item_type]
# Get the next page's items
params['page-number'] += 1
items += sis_get_items(uri, params, headers, item_type)
if args.debug:
print('num {}: {}'.format(item_type, len(items)))
print(items)
return items
def sis_get_lecture_section_ids(e_id, e_key, term_id, subject_area, catalog_number):
'''
Given a term, subject, and course number, return the lecture section ids.
We only care about the lecture enrollments since they contain a superset
of the enrollments of all other section types (lab, dis).
'''
headers = { "Accept": "application/json", "app_id": e_id, "app_key": e_key }
params = {
'page-number': 1,
"subject-area-code": subject_area,
"catalog-number": catalog_number,
}
# Retrieve the sections associated with the course which includes
# both lecture and sections.
uri = descriptors_uri.format(term_id)
sections = sis_get_items(uri, params, headers, 'fieldValues')
return sis_filter_lectures(sections)
def sis_get_enrollments(e_id, e_key, term_id, subject_area, catalog_number):
if args.debug: print("sis_get_enrollments: {}".format(catalog_number))
'''Gets a course's enrollments from the SIS.'''
# get the lectures
lecture_codes = sis_get_lecture_section_ids(e_id, e_key, term_id,
subject_area, catalog_number)
headers = { "Accept": "application/json", "app_id": e_id, "app_key": e_key }
params = {
"page-number": 1,
"page-size": 100,
}
# get the enrollments in each lecture
enrollments = []
for lecture_code in lecture_codes:
uri = sections_uri.format(term_id, lecture_code)
enrollments += sis_get_items(uri, params, headers,
'classSectionEnrollments')
if args.verbose: print('{} {}'.format(catalog_number, len(enrollments)))
return enrollments
def sis_filter_section_instructors(section):
'''Extract the campus-uid of instructors from a section.'''
uids = set()
if 'meetings' not in section: uids
meetings = section['meetings']
for meeting in meetings:
if 'assignedInstructors' not in meeting: continue
instructors = meeting['assignedInstructors']
for instructor in instructors:
if 'identifiers' not in instructor['instructor']: continue
identifiers = instructor['instructor']['identifiers']
for identifier in identifiers:
# {'disclose': True, 'id': '1234', 'type': 'campus-uid'}
if 'disclose' not in identifier: continue
if not identifier['disclose']: continue
if identifier['type'] != 'campus-uid': continue
uids.add(identifier['id'])
return uids
def sis_get_instructors_gsis(c_id, c_key, term_id, subject_area, catalog_number):
'''Given a term, subject, and SIS catalog number, returns a list of
instructors and a list of GSIs.'''
headers = {
"Accept": "application/json",
"app_id": c_id,
"app_key": c_key
}
params = {
"subject-area-code": subject_area,
"catalog-number": catalog_number,
"term-id": term_id,
"page-size": 400,
"page-number": 1
}
instructors = set(); gsis = set()
# Retrieve the sections associated with the course which includes
# both lecture and sections.
sections = sis_get_items(classes_sections_uri, params, headers,
'classSections')
for section in sections:
uids = sis_filter_section_instructors(section)
if section['association']['primary']:
instructors.update(uids)
else:
gsis.update(uids)
return (list(instructors), list(gsis))
def grouper_replace_users(auth, group, users):
'''Replace the members of the grouper group {group} with {users}.'''
if args.verbose: print('transferring to {}'.format(group))
# https://github.com/Internet2/grouper/blob/master/grouper-ws/grouper-ws/doc/samples/addMember/WsSampleAddMemberRest_json.txt
data = {
"WsRestAddMemberRequest": {
"replaceAllExisting":"T",
"subjectLookups":[]
}
}
for user in users:
data['WsRestAddMemberRequest']['subjectLookups'].append(
{"subjectId":user}
)
r = requests.put(
grouper_base_uri + '/groups/{group}/members'.format(group=group),
data=json.dumps(data), auth=auth, headers={'Content-type':'text/x-json'}
)
out = r.json()
if 'WsRestResultProblem' in out:
msg = out['WsRestResultProblem']['resultMetadata']['resultMessage']
raise Exception(msg)
def valid_class(c):
return (
'term_id' in c and
'subject_area' in c and
'catalog_number' in c
)
def get_enrollment_uids(enrollments):
'''Given an SIS enrollment, return the student's campus UID.'''
def campus_uid(enrollment):
for identifier in enrollment['student']['identifiers']:
if identifier['type'] == 'campus-uid':
return identifier['id']
return list(map(lambda x: campus_uid(x), enrollments))
def get_enrollment_emails(enrollments):
'''Given an SIS enrollment, return the student's campus email.'''
def campus_email(enrollment):
emails = {}
for email in enrollment['student'].get('emails', []):
if email['type']['code'] == 'CAMP': return email['emailAddress']
return None
return list(map(lambda x: campus_email(x), enrollments))
def enrollment_status(enrollment):
'''Returns 'E', 'W', or 'D'.'''
return str(enrollment['enrollmentStatus']['status']['code'])
def filter_enrollment_status(enrollments, status):
return list(filter(lambda x: enrollment_status(x) == status, enrollments))
def calgroup_child_id(calgroup, child):
'''Given a:base:group for a course, return a:base:group:group-child.'''
els = calgroup.split(':')
return ':'.join(els + [els[-1] + '-' + str(child)])
def has_all_keys(d, keys):
return all (k in d for k in keys)
def read_json_data(filename, required_keys):
'''Read and validate data from a json file.'''
if not os.path.exists(filename):
s = "No such file: {}".format(filename)
raise Exception(s)
data = json.loads(open(filename).read())
# check that we've got all of our required keys
if not has_all_keys(data, required_keys):
s = "Missing parameters in {}: {}".format(
filename,
set(required_keys) - set(data.keys())
)
raise Exception(s)
return data
def read_secrets(filename):
required_keys = ['sis_enrollments_id', 'sis_enrollments_key',
'sis_classes_id', 'sis_classes_key', 'grouper_user', 'grouper_pass']
return read_json_data(filename, required_keys)
def read_config(filename):
# read config from config file
required_keys = ['base_group', 'courses']
return read_json_data(filename, required_keys)
## main
grouper_base_uri = 'https://calgroups.berkeley.edu/gws/servicesRest/json/v2_2_100'
parser = argparse.ArgumentParser(description="Sync SIS classes to CalGroups.")
parser.add_argument('-c', dest='config',
default='/etc/sis-to-calgroups.json', help='Configuration file.')
parser.add_argument('-s', dest='secrets',
default='/root/.sis-to-calgroups.json', help='Secrets file.')
parser.add_argument('-v', dest='verbose', action='store_true',
help='Be verbose.')
parser.add_argument('-d', dest='debug', action='store_true',
help='Debug.')
parser.add_argument('-n', dest='dryrun', action='store_true',
help='Dry run. Print enrollments without updating CalGroups.')
args = parser.parse_args()
# read secrets from secrets file
secrets = read_secrets(args.secrets)
# read config from config file
config = read_config(args.config)
for c in config['courses']:
if not valid_class(c):
print('Incomplete course parameters: {}'.format(c))
continue
# store the uids for our various groups
uids = {}
# fetch student enrollments
enrollments = sis_get_enrollments(
secrets['sis_enrollments_id'], secrets['sis_enrollments_key'],
c['term_id'], c['subject_area'], c['catalog_number']
)
# filter student uids
uids['enrolled'] = get_enrollment_uids(
filter_enrollment_status(enrollments, 'E'))
uids['waitlisted'] = get_enrollment_uids(
filter_enrollment_status(enrollments, 'W'))
uids['dropped'] = get_enrollment_uids(
filter_enrollment_status(enrollments, 'D'))
# fetch instructor and gsi uids
uids['instructors'], uids['gsis'] = sis_get_instructors_gsis(
secrets['sis_classes_id'], secrets['sis_classes_key'],
c['term_id'], c['subject_area'], c['catalog_number'])
# term ~ edu:berkeley:org:stat:stat-classes:stat-classes-2188
term_group = calgroup_child_id(config['base_group'], c['term_id'])
# course ~ {term}:stat-classes-2188-stat-243
course_group = calgroup_child_id(term_group,
c['subject_area'].lower() + '-' + c['catalog_number'])
if args.dryrun:
print(course_group)
print('_instructors')
for uid in uids['instructors']: print(uid)
print('_gsis')
for uid in uids['gsis']: print(uid)
print('_enrolled')
for email in get_enrollment_emails(enrolled): print(email)
print('_waitlisted')
for email in get_enrollment_emails(waitlisted): print(email)
print()
else:
if args.verbose: print(course_group)
grouper_auth = requests.auth.HTTPBasicAuth(
secrets['grouper_user'], secrets['grouper_pass']
)
for suffix in ['enrolled', 'waitlisted', 'instructors', 'gsis']:
grouper_replace_users(
grouper_auth,
calgroup_child_id(course_group, suffix),
uids[suffix]
)
@ryanlovett
Copy link
Author

Some exerpted docs:

CalGroups from SIS Rosters

Manually create the following structure in Berkeley:Organizations:Stat:Classes. Eventually this will be done by sis-to-calgroups.py.

  • Name: TERM ; ID: stat-classes-TERM ; E.g.: TERM=2188

    • Name: COURSE ; ID: stat-classes-TERM-COURSE ; E.g.: COURSE=stat-243

      • Name: enrolled ; ID: stat-classes-TERM-COURSE-enrolled

        • Members are pulled from SIS and populated by sis-to-calgroups.py.
      • Name: waitlisted ; ID: stat-classes-TERM-COURSE-waitlisted

        • Members are pulled from SIS and populated by sis-to-calgroups.py.
      • Name: gsis ; ID: stat-classes-TERM-COURSE-gsis

        • Members are pulled from SIS and populated by sis-to-calgroups.py.
      • Name: instructors ; ID: stat-classes-TERM-COURSE-instructors

        • Members are pulled from SIS and populated by sis-to-calgroups.py.
      • Name: non-enrolled ; ID: stat-classes-TERM-COURSE-non-enrolled

        • Members are manually added.

        • This group is intended to hold auditors, guests, observers, etc.

      • Name: admins ; ID: stat-classes-TERM-COURSE-admins

        • Members are manually added.

        • This group is intended to hold group technical contacts, faculty assistants, departmental administrators, etc.

      • Name: FRIENDLY_NAME ; ID: stat-classes-TERM-COURSE-all ; E.g.: FRIENDLY_NAME="Stat 243 Fall 2018"

        • Members are enrolled, gsis, instructors, admins, and non-enrolled.

        • This group is provisioned to Google Groups. This group's name is a friendly name, unlike the others, because Google Groups uses it within the web UI. Something terse like "all-members" would be ambiguous because one student may be in more than one of these courses.

        • The direct members "instructors" and "admins" should be give admin privileges of this group. This is currently done manually by clicking on Actions next to "instructors" and "admins" when viewing the direct members of this group.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment