Skip to content

Instantly share code, notes, and snippets.

@markgajdosik
Created December 8, 2018 01:33
Show Gist options
  • Save markgajdosik/a4504907caa1228c132aaf59dc42339c to your computer and use it in GitHub Desktop.
Save markgajdosik/a4504907caa1228c132aaf59dc42339c to your computer and use it in GitHub Desktop.
async def import_regular_records(
self, role: VOSSRole, direction: Optional[str]):
"""Using list API, imports records for a non-relation role.
:param role: Selected role to import records for.
:param direction: Hierarchy traversal direction or `None` if the
default traversal behaviour is required.
"""
if direction == 'default':
direction = None
# Install 'voss' additional fields.
role.install_additional_fields(common=False)
existing_pkids = set(role.records.values_list('uid', flat=True))
received_pkids = set()
received_hierarchy_pkids = set()
for hierarchy_pkid in self.options['hierarchy_pkids'] or [None]:
hierarchy = self.hierarchy_map.get(hierarchy_pkid)
url_params = dict(count='true', summary='false', limit=1)
if hierarchy_pkid:
# We're importing from a specific hierarchy.
url_params['hierarchy'] = hierarchy_pkid
if direction:
# We're overriding the default traversal.
url_params['traversal'] = direction
# Get first page with 1 record (0 not possible) to get the max.
# page size limit and the total number of records.
first_page = await self.fetch_page(role, url_params, 0)
try:
limit = first_page['pagination'].get('maximum_limit')
if limit is None:
limit = 50 # (fall back to a sane default)
self.warning(f'Falling back to page size of {limit} '
f'records for <b>{role}</b>.')
url_params['limit'] = limit
total = first_page['pagination']['total']
except KeyError:
self.error(f'Cannot determine page size limit or the total '
f'number of records for <b>{role}</b>. Skipping.')
return
self.info(f'Importing {total} <b>{role}</b> record/s from '
f'<u>%s</u> (page size: {limit} records).',
f'{hierarchy[HFP]}.{hierarchy["name"]}'
if hierarchy_pkid else 'all hierarchies')
import_progress = self.log.append_progress(
f'Importing <b>{role}</b> records', total)
save_progress = self.log.append_progress(
f'Saving <b>{role}</b> records', total)
pages = (self.fetch_page(role, url_params, offset, import_progress)
for offset in range(0, total, limit))
rows = []
for page in await asyncio.gather(*pages):
for row in page.get('resources', ()):
try:
pkid = row['pkid']
if pkid in received_pkids:
continue # (already got this; next record)
received_pkids.add(pkid)
received_hierarchy_pkids.add(
row['meta']['references']['parent'][0]['pkid'])
except KeyError as field:
self.error(f'Got a record with missing {field} field.',
detail=json_as_html(field))
continue
if pkid not in existing_pkids:
rows.append(row)
if len(rows) >= self.MAX_RECORD_BULK_SIZE:
self.create_api_records(role, rows,
progress=save_progress)
rows = []
else:
self.update_api_record(role, row)
if rows:
# Create any remaining records.
self.create_api_records(role, rows, progress=save_progress)
import_progress.finish()
save_progress.finish()
# Retrieve records we missed during the pagination.
self.log.info(f'Checking for missing <b>{role}</b> records.')
expected_pkids = (self.fetch_pkids(role, hierarchy_pkid)
for hierarchy_pkid in received_hierarchy_pkids)
expected_pkids = set(pkid for pkids in
await asyncio.gather(*expected_pkids)
for pkid in pkids)
missed_pkids = expected_pkids - received_pkids
if missed_pkids:
self.warning(f'Missed {len(missed_pkids)} <b>{role}</b> record/s.')
import_progress = self.log.append_progress(
'Importing missed records', len(missed_pkids))
save_progress = self.log.append_progress(
'Saving missed records', len(missed_pkids))
missed_records = (self.fetch_record(role, pkid, import_progress)
for pkid in missed_pkids)
rows = []
for row in await asyncio.gather(*missed_records):
if row['pkid'] not in existing_pkids:
rows.append(row)
if len(rows) >= self.MAX_RECORD_BULK_SIZE:
self.create_api_records(role, rows,
progress=save_progress)
rows = []
else:
self.update_api_record(role, row)
if rows:
# Create any remaining missing records.
self.create_api_records(role, rows, progress=save_progress)
else:
self.info(f'No <b>{role}</b> records were missed.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment