-
-
Save arikfr/d29db3617df5a8d547b4709a631f5b6a to your computer and use it in GitHub Desktop.
Redash migration script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
import logging | |
import sys | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
logging.getLogger("requests").setLevel("ERROR") | |
# The Redash instance you're copying from: | |
ORIGIN = "https://redash.acme.com" | |
ORIGIN_API_KEY = "..." # admin API key | |
# The Redash account you're copying into: | |
DESTINATION = 'https://app.redash.io/acme' | |
DESTINATION_API_KEY = "..." # admin API key | |
# You need to create the data sources in advance in the destination account. Once created, update the map here: | |
# (origin Redash data source id -> destination Redash data source id) | |
DATA_SOURCES = { | |
1: 1234 | |
} | |
meta = { | |
# include here any users you already created in the target Redash account. | |
# the key is the user id in the origin Redash instance. make sure to include the API key, as it used to recreate any objects | |
# this user might have owned. | |
"users": { | |
"1": { | |
"id": 1234, | |
"email": "user@acme.com", | |
"invite_link": "", | |
"api_key": "" | |
}, | |
}, | |
"queries": {}, | |
"visualizations": {} | |
} | |
def api_request(api): | |
response = requests.get(ORIGIN + api, headers=auth_headers(ORIGIN_API_KEY)) | |
response.raise_for_status() | |
return response.json() | |
def auth_headers(api_key): | |
return { | |
"Authorization": "Key {}".format(api_key) | |
} | |
def import_users(): | |
print "Importing users..." | |
users = api_request('/api/users') | |
for user in users: | |
print " importing: {}".format(user['id']) | |
data = { | |
"name": user['name'], | |
"email": user['email'] | |
} | |
if str(user['id']) in meta['users']: | |
print " ... skipping: exists." | |
continue | |
if user['email'] == 'admin': | |
print " ... skipping: admin." | |
continue | |
response = requests.post(DESTINATION + '/api/users?no_invite=1', json=data, headers=auth_headers(DESTINATION_API_KEY)) | |
response.raise_for_status() | |
new_user = response.json() | |
meta['users'][user['id']] = { | |
'id': new_user['id'], | |
'email': new_user['email'], | |
'invite_link': new_user['invite_link'] | |
} | |
def get_api_key(user_id): | |
response = requests.get(DESTINATION + '/api/users/{}'.format(user_id), headers=auth_headers(DESTINATION_API_KEY)) | |
response.raise_for_status() | |
return response.json()['api_key'] | |
def user_with_api_key(user_id): | |
user = meta['users'].get(user_id) or meta['users'].get(str(user_id)) | |
if 'api_key' not in user: | |
user['api_key'] = get_api_key(user['id']) | |
return user | |
def get_queries(url, api_key): | |
queries = [] | |
headers = {'Authorization': 'Key {}'.format(api_key)} | |
path = "{}/api/queries".format(url) | |
has_more = True | |
page = 1 | |
while has_more: | |
response = requests.get(path, headers=headers, params={'page': page}).json() | |
queries.extend(response['results']) | |
has_more = page * response['page_size'] + 1 <= response['count'] | |
page += 1 | |
return queries | |
def get_queries_old(url, api_key): | |
headers = {'Authorization': 'Key {}'.format(api_key)} | |
path = "{}/api/queries".format(url) | |
response = requests.get(path, headers=auth_headers(api_key)) | |
response.raise_for_status() | |
return response.json() | |
def import_queries(): | |
print "Import queries..." | |
# Depends on the Redash version running in origin, you might need to use `get_queries_old`. | |
queries = get_queries(ORIGIN, ORIGIN_API_KEY) | |
#queries = get_queries_old(ORIGIN, ORIGIN_API_KEY) | |
for query in queries: | |
print " importing: {}".format(query['id']) | |
data_source_id = DATA_SOURCES.get(query['data_source_id']) | |
if data_source_id is None: | |
print " skipped ({})".format(data_source_id) | |
continue | |
data = { | |
"data_source_id": data_source_id, | |
"query": query['query'], | |
"is_archived": query['is_archived'], | |
"schedule": query['schedule'], | |
"description": query['description'], | |
"name": query['name'], | |
} | |
user = user_with_api_key(query['user']['id']) | |
response = requests.post(DESTINATION + '/api/queries', json=data, headers=auth_headers(user['api_key'])) | |
response.raise_for_status() | |
meta['queries'][query['id']] = response.json()['id'] | |
def import_visualizations(): | |
print "Importing visualizations..." | |
for query_id, new_query_id in meta['queries'].iteritems(): | |
query = api_request('/api/queries/{}'.format(query_id)) | |
print " importing visualizations of: {}".format(query_id) | |
for v in query['visualizations']: | |
if v['type'] == 'TABLE': | |
response = requests.get(DESTINATION + '/api/queries/{}'.format(new_query_id), headers=auth_headers(DESTINATION_API_KEY)) | |
response.raise_for_status() | |
new_vis = response.json()['visualizations'] | |
for new_v in new_vis: | |
if new_v['type'] == 'TABLE': | |
meta['visualizations'][v['id']] = new_v['id'] | |
else: | |
user = user_with_api_key(query['user']['id']) | |
data = { | |
"name": v['name'], | |
"description": v['description'], | |
"options": v['options'], | |
"type": v['type'], | |
"query_id": new_query_id | |
} | |
response = requests.post(DESTINATION + '/api/visualizations', json=data, headers=auth_headers(user['api_key'])) | |
response.raise_for_status() | |
meta['visualizations'][v['id']] = response.json()['id'] | |
def import_dashboards(): | |
print "Importing dashboards..." | |
dashboards = api_request('/api/dashboards') | |
for dashboard in dashboards: | |
print " importing: {}".format(dashboard['slug']) | |
# laod full dashboard | |
d = api_request('/api/dashboards/{}'.format(dashboard['slug'])) | |
# create dashboard | |
data = {'name': d['name']} | |
user = user_with_api_key(d['user_id']) | |
response = requests.post(DESTINATION + '/api/dashboards', json=data, headers=auth_headers(user['api_key'])) | |
response.raise_for_status() | |
new_dashboard = response.json() | |
# recreate widget | |
for row in d['widgets']: | |
for widget in row: | |
data = { | |
'dashboard_id': new_dashboard['id'], | |
'options': widget['options'], | |
'width': widget['width'], | |
'text': widget['text'], | |
'visualization_id': None | |
} | |
if 'visualization' in widget: | |
data['visualization_id'] = meta['visualizations'].get(widget['visualization']['id']) | |
if 'visualization' in widget and not data['visualization_id']: | |
print 'skipping for missing viz' | |
continue | |
response = requests.post(DESTINATION + '/api/widgets', json=data, headers=auth_headers(user['api_key'])) | |
response.raise_for_status() | |
def save_meta(): | |
print "Saving meta..." | |
with open('meta.json', 'w') as f: | |
json.dump(meta, f) | |
def import_all(): | |
try: | |
# If you had an issue while running this the first time, fixed it and running again, uncomment the following to skip content already imported. | |
#with open('meta.json') as f: | |
# meta.update(json.load(f)) | |
import_users() | |
import_queries() | |
import_visualizations() | |
import_dashboards() | |
except Exception as ex: | |
logging.exception(ex) | |
save_meta() | |
if __name__ == '__main__': | |
import_all() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, thanks for script! i update to work with python3 and with new API Redash, see in https://gist.github.com/jeanmarcosdarosa/4334b293a353b91b88868308238030fd
feel free to update your script