Skip to content

Instantly share code, notes, and snippets.

@nightsh
Created July 23, 2020 13:59
Show Gist options
  • Save nightsh/f751cbbe5ae4ad40e82b5fcd1ca7b4c2 to your computer and use it in GitHub Desktop.
Save nightsh/f751cbbe5ae4ad40e82b5fcd1ca7b4c2 to your computer and use it in GitHub Desktop.
migrate_level_of_data.py
from __future__ import print_function
import os
import sys
from ckanapi import RemoteCKAN
url = os.getenv('ED_CKAN_URL', None)
apiKey = os.getenv('ED_CKAN_KEY', None)
def get_packages():
packages = remote.action.package_list()
return packages
def package_search(data_dict):
return remote.call_action(action='package_search', data_dict=data_dict)
def get_all_packages():
rows = 1000
data_dict = {
'q' : '*:*',
'rows' : rows,
'start' : 0,
'type' : 'dataset',
'include_private': True
}
print('Getting a list of all packages... ', end='')
result = package_search(data_dict=data_dict)
if not result:
return []
packages = []
packages.extend(result.get('results',[]))
count = result.get('count')
number_of_pages = (count // rows if count % rows == 0 else count // rows + 1)
if number_of_pages == 1:
return packages
else:
for page_number in range(1, number_of_pages):
data_dict = {
'q' : '*:*',
'rows': rows,
'start': page_number * rows,
'type' : 'dataset',
'include_private': True
}
result = package_search(data_dict=data_dict)
packages.extend(result.get('results',[]))
print('done!')
return packages
def get_package(id):
try:
package = remote.action.package_show(id=id)
except Exception as e:
#print(str(e))
#print('Package id: ', id)
return None
return package
def update_package(package):
return remote.call_action(action='package_update', data_dict=package)
def update_one_package(id):
package = get_package(id)
if package.get('type') != 'dataset':
return
if (type(package.get('level_of_data', [])) == str):
print('Package to update: ', package.get('name'))
print('Package level of data: ', package.get('level_of_data'))
result = update_package(package)
print('Update package: ', result)
print('\n')
def filter_unwanted_chars(level_of_data):
level_of_data = level_of_data.replace('{', '')
level_of_data = level_of_data.replace('}', '')
level_of_data = level_of_data.replace('\"', '')
level_of_data = level_of_data.replace(' ', '-')
return level_of_data
def update_all_packages():
packages = get_all_packages()
total_packages = len(packages)
current = 1
updated = []
errors = []
print('Number of packages: {}'.format(total_packages))
for package in packages:
id = package.get('id')
package = get_package(id)
if package is None:
print('[{}/{}] Skipping {} (cannot fetch)'.format(current,
total_packages,
id))
current = current + 1
continue
# import ipdb; ipdb.set_trace()
extras = [d['key'] for d in package.get('extras', [])]
if (type(package.get('level_of_data', [])) == str or
'level_of_data' in extras):
print('[{}/{}] Updating package {}... '.format(current,
total_packages,
package.get('name')), end='')
if package.get('level_of_data',[]):
level_of_data = package.get('level_of_data',[])
else:
new_extras = []
for extra in package.get('extras'):
if extra['key'] == 'level_of_data':
level_of_data = extra['value']
else:
new_extras.append(extra)
level_of_data = level_of_data.lower()
level_of_data = filter_unwanted_chars(level_of_data)
package['level_of_data'] = level_of_data
package['extras'] = new_extras
try:
result = update_package(package)
updated.append(package)
print('done!')
except:
errors.append(package)
print('failed!')
current = current + 1
else:
print('[{}/{}] Skipping package {} (nothing to update) '.format(
current,
total_packages,
package.get('name')))
current = current + 1
if len(updated):
print('=================================')
print('Updated level of data for {} package{}:'.format(len(updated), ('s' if len(errors) > 1 else '')))
for e in updated:
print(e['name'])
if len(errors):
print('=================================')
print('Failed updating {} package{}:'.format(len(errors), ('s' if len(errors) > 1 else '')))
for e in errors:
print(e['name'])
print('Finished!')
if len(updated):
print('Updated {} package{}.'.format(len(errors), ('s' if len(errors) > 1 else '')))
if len(errors):
print('Failed updating {} package{}.'.format(len(errors), ('s' if len(errors) > 1 else '')))
if not len(errors) and not len(updated):
print('No packages to update!')
if __name__ == '__main__':
#update_one_package('')
errors = []
if not url:
errors.append('ED_CKAN_URL environment variable is needed.')
if not apiKey:
errors.append('ED_CKAN_KEY environment variable is needed.')
if len(errors):
for e in errors:
print(e)
sys.exit(1)
remote = RemoteCKAN(url, apiKey)
print('CKAN URL: {}'.format(url))
update_all_packages()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment