Skip to content

Instantly share code, notes, and snippets.

@arikfr arikfr/migrator.py Secret
Created Mar 9, 2018

Embed
What would you like to do?
Redash migration script
import json
import requests
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logging.getLogger("requests").setLevel("ERROR")
# The Redash instance you're copying from:
ORIGIN = "https://redash.acme.com"
ORIGIN_API_KEY = "..." # admin API key
# The Redash account you're copying into:
DESTINATION = 'https://app.redash.io/acme'
DESTINATION_API_KEY = "..." # admin API key
# You need to create the data sources in advance in the destination account. Once created, update the map here:
# (origin Redash data source id -> destination Redash data source id)
DATA_SOURCES = {
1: 1234
}
meta = {
# include here any users you already created in the target Redash account.
# the key is the user id in the origin Redash instance. make sure to include the API key, as it used to recreate any objects
# this user might have owned.
"users": {
"1": {
"id": 1234,
"email": "user@acme.com",
"invite_link": "",
"api_key": ""
},
},
"queries": {},
"visualizations": {}
}
def api_request(api):
response = requests.get(ORIGIN + api, headers=auth_headers(ORIGIN_API_KEY))
response.raise_for_status()
return response.json()
def auth_headers(api_key):
return {
"Authorization": "Key {}".format(api_key)
}
def import_users():
print "Importing users..."
users = api_request('/api/users')
for user in users:
print " importing: {}".format(user['id'])
data = {
"name": user['name'],
"email": user['email']
}
if str(user['id']) in meta['users']:
print " ... skipping: exists."
continue
if user['email'] == 'admin':
print " ... skipping: admin."
continue
response = requests.post(DESTINATION + '/api/users?no_invite=1', json=data, headers=auth_headers(DESTINATION_API_KEY))
response.raise_for_status()
new_user = response.json()
meta['users'][user['id']] = {
'id': new_user['id'],
'email': new_user['email'],
'invite_link': new_user['invite_link']
}
def get_api_key(user_id):
response = requests.get(DESTINATION + '/api/users/{}'.format(user_id), headers=auth_headers(DESTINATION_API_KEY))
response.raise_for_status()
return response.json()['api_key']
def user_with_api_key(user_id):
user = meta['users'].get(user_id) or meta['users'].get(str(user_id))
if 'api_key' not in user:
user['api_key'] = get_api_key(user['id'])
return user
def get_queries(url, api_key):
queries = []
headers = {'Authorization': 'Key {}'.format(api_key)}
path = "{}/api/queries".format(url)
has_more = True
page = 1
while has_more:
response = requests.get(path, headers=headers, params={'page': page}).json()
queries.extend(response['results'])
has_more = page * response['page_size'] + 1 <= response['count']
page += 1
return queries
def get_queries_old(url, api_key):
headers = {'Authorization': 'Key {}'.format(api_key)}
path = "{}/api/queries".format(url)
response = requests.get(path, headers=auth_headers(api_key))
response.raise_for_status()
return response.json()
def import_queries():
print "Import queries..."
# Depends on the Redash version running in origin, you might need to use `get_queries_old`.
queries = get_queries(ORIGIN, ORIGIN_API_KEY)
#queries = get_queries_old(ORIGIN, ORIGIN_API_KEY)
for query in queries:
print " importing: {}".format(query['id'])
data_source_id = DATA_SOURCES.get(query['data_source_id'])
if data_source_id is None:
print " skipped ({})".format(data_source_id)
continue
data = {
"data_source_id": data_source_id,
"query": query['query'],
"is_archived": query['is_archived'],
"schedule": query['schedule'],
"description": query['description'],
"name": query['name'],
}
user = user_with_api_key(query['user']['id'])
response = requests.post(DESTINATION + '/api/queries', json=data, headers=auth_headers(user['api_key']))
response.raise_for_status()
meta['queries'][query['id']] = response.json()['id']
def import_visualizations():
print "Importing visualizations..."
for query_id, new_query_id in meta['queries'].iteritems():
query = api_request('/api/queries/{}'.format(query_id))
print " importing visualizations of: {}".format(query_id)
for v in query['visualizations']:
if v['type'] == 'TABLE':
response = requests.get(DESTINATION + '/api/queries/{}'.format(new_query_id), headers=auth_headers(DESTINATION_API_KEY))
response.raise_for_status()
new_vis = response.json()['visualizations']
for new_v in new_vis:
if new_v['type'] == 'TABLE':
meta['visualizations'][v['id']] = new_v['id']
else:
user = user_with_api_key(query['user']['id'])
data = {
"name": v['name'],
"description": v['description'],
"options": v['options'],
"type": v['type'],
"query_id": new_query_id
}
response = requests.post(DESTINATION + '/api/visualizations', json=data, headers=auth_headers(user['api_key']))
response.raise_for_status()
meta['visualizations'][v['id']] = response.json()['id']
def import_dashboards():
print "Importing dashboards..."
dashboards = api_request('/api/dashboards')
for dashboard in dashboards:
print " importing: {}".format(dashboard['slug'])
# laod full dashboard
d = api_request('/api/dashboards/{}'.format(dashboard['slug']))
# create dashboard
data = {'name': d['name']}
user = user_with_api_key(d['user_id'])
response = requests.post(DESTINATION + '/api/dashboards', json=data, headers=auth_headers(user['api_key']))
response.raise_for_status()
new_dashboard = response.json()
# recreate widget
for row in d['widgets']:
for widget in row:
data = {
'dashboard_id': new_dashboard['id'],
'options': widget['options'],
'width': widget['width'],
'text': widget['text'],
'visualization_id': None
}
if 'visualization' in widget:
data['visualization_id'] = meta['visualizations'].get(widget['visualization']['id'])
if 'visualization' in widget and not data['visualization_id']:
print 'skipping for missing viz'
continue
response = requests.post(DESTINATION + '/api/widgets', json=data, headers=auth_headers(user['api_key']))
response.raise_for_status()
def save_meta():
print "Saving meta..."
with open('meta.json', 'w') as f:
json.dump(meta, f)
def import_all():
try:
# If you had an issue while running this the first time, fixed it and running again, uncomment the following to skip content already imported.
#with open('meta.json') as f:
# meta.update(json.load(f))
import_users()
import_queries()
import_visualizations()
import_dashboards()
except Exception as ex:
logging.exception(ex)
save_meta()
if __name__ == '__main__':
import_all()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.