Skip to content

Instantly share code, notes, and snippets.

@arikfr
Last active September 8, 2022 07:58
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save arikfr/e1c01f6c04d8348da52f33393b5bf65d to your computer and use it in GitHub Desktop.
Save arikfr/e1c01f6c04d8348da52f33393b5bf65d to your computer and use it in GitHub Desktop.
Migration script for V4
import json
import requests
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logging.getLogger("requests").setLevel("ERROR")
# The Redash instance you're copying from:
ORIGIN = "https://redash.acme.com"
ORIGIN_API_KEY = "" # admin API key
# The Redash account you're copying into:
DESTINATION = 'https://app.redash.io/acme'
DESTINATION_API_KEY = "" # admin API key
# You need to create the data sources in advance in the destination account. Once created, update the map here:
# (origin Redash data source id -> destination Redash data source id)
DATA_SOURCES = {
1: 1234
}
meta = {
# include here any users you already created in the target Redash account.
# the key is the user id in the origin Redash instance. make sure to include the API key, as it used to recreate any objects
# this user might have owned.
"users": {
"1": {
"id": 1234,
"email": "user@acme.com",
"invite_link": "",
"api_key": ""
},
},
"queries": {},
"visualizations": {},
"dashboards": {}
}
def auth_headers(api_key):
return {
"Authorization": "Key {}".format(api_key)
}
def api_request(api):
response = requests.get(ORIGIN + api, headers=auth_headers(ORIGIN_API_KEY))
response.raise_for_status()
return response.json()
def import_users():
print "Importing users..."
users = api_request('/api/users')
for user in users:
print " importing: {}".format(user['id'])
data = {
"name": user['name'],
"email": user['email']
}
if str(user['id']) in meta['users']:
print " ... skipping: exists."
continue
if user['email'] == 'admin':
print " ... skipping: admin."
continue
response = requests.post(DESTINATION + '/api/users?no_invite=1',
json=data, headers=auth_headers(DESTINATION_API_KEY))
response.raise_for_status()
new_user = response.json()
meta['users'][user['id']] = {
'id': new_user['id'],
'email': new_user['email'],
'invite_link': "" # new_user['invite_link']
}
def get_api_key(user_id):
response = requests.get(DESTINATION + '/api/users/{}'.format(user_id),
headers=auth_headers(DESTINATION_API_KEY))
response.raise_for_status()
return response.json()['api_key']
def user_with_api_key(user_id):
user = meta['users'].get(user_id) or meta['users'].get(str(user_id))
if 'api_key' not in user:
user['api_key'] = get_api_key(user['id'])
return user
def get_queries(url, api_key):
queries = []
headers = {'Authorization': 'Key {}'.format(api_key)}
path = "{}/api/queries".format(url)
has_more = True
page = 1
while has_more:
response = requests.get(path, headers=headers,
params={'page': page}).json()
queries.extend(response['results'])
has_more = page * response['page_size'] + 1 <= response['count']
page += 1
return queries
def get_queries_old(url, api_key):
headers = {'Authorization': 'Key {}'.format(api_key)}
path = "{}/api/queries".format(url)
response = requests.get(path, headers=auth_headers(api_key))
response.raise_for_status()
return response.json()
def convert_schedule(schedule):
if schedule is None:
return schedule
schedule_json = {
'interval': None,
'until': None,
'day_of_week': None,
'time': None
}
if ":" in schedule:
schedule_json['interval'] = 86400
schedule_json['time'] = schedule
else:
schedule_json['interval'] = schedule
return schedule_json
def import_queries():
print "Import queries..."
# Depends on the Redash version running in origin, you might need to use `get_queries_old`.
queries = get_queries(ORIGIN, ORIGIN_API_KEY)
# queries = get_queries_old(ORIGIN, ORIGIN_API_KEY)
for query in queries:
print " importing: {}".format(query['id'])
data_source_id = DATA_SOURCES.get(query['data_source_id'])
if data_source_id is None:
print " skipped ({})".format(data_source_id)
continue
if str(query['id']) in meta['queries']:
print " skipped - was already imported".format(data_source_id)
continue
data = {
"data_source_id": data_source_id,
"query": query['query'],
"is_archived": query['is_archived'],
"schedule": convert_schedule(query['schedule']),
"description": query['description'],
"name": query['name'],
}
user = user_with_api_key(query['user']['id'])
response = requests.post(
DESTINATION + '/api/queries', json=data, headers=auth_headers(user['api_key']))
response.raise_for_status()
meta['queries'][query['id']] = response.json()['id']
def import_visualizations():
print "Importing visualizations..."
for query_id, new_query_id in meta['queries'].iteritems():
query = api_request('/api/queries/{}'.format(query_id))
print " importing visualizations of: {}".format(query_id)
for v in query['visualizations']:
if v['type'] == 'TABLE':
response = requests.get(DESTINATION + '/api/queries/{}'.format(
new_query_id), headers=auth_headers(DESTINATION_API_KEY))
response.raise_for_status()
new_vis = response.json()['visualizations']
for new_v in new_vis:
if new_v['type'] == 'TABLE':
meta['visualizations'][v['id']] = new_v['id']
else:
if str(v['id']) in meta['visualizations']:
continue
user = user_with_api_key(query['user']['id'])
data = {
"name": v['name'],
"description": v['description'],
"options": v['options'],
"type": v['type'],
"query_id": new_query_id
}
response = requests.post(
DESTINATION + '/api/visualizations', json=data, headers=auth_headers(user['api_key']))
response.raise_for_status()
meta['visualizations'][v['id']] = response.json()['id']
def import_dashboards():
print "Importing dashboards..."
dashboards = api_request('/api/dashboards')
for dashboard in dashboards:
print " importing: {}".format(dashboard['slug'])
# laod full dashboard
d = api_request('/api/dashboards/{}'.format(dashboard['slug']))
# create dashboard
data = {'name': d['name']}
user = user_with_api_key(d['user_id'])
response = requests.post(
DESTINATION + '/api/dashboards', json=data, headers=auth_headers(user['api_key']))
response.raise_for_status()
new_dashboard = response.json()
requests.post(
DESTINATION + '/api/dashboards/{}'.format(new_dashboard['id']), json={'is_draft': False}, headers=auth_headers(user['api_key']))
# recreate widget
for row in d['widgets']:
row_index = 0
col_index = 0
for widget in row:
data = {
'dashboard_id': new_dashboard['id'],
'options': widget['options'],
'width': widget['width'],
'text': widget['text'],
'visualization_id': None
}
if not isinstance(widget['options'], dict):
widget['options'] = {}
if 'position' not in widget['options']:
widget['options']['position'] = {
'sizeX': 3,
'sizeY': 8,
'row': row_index,
'col': col_index
}
row_index += 8
col_index += 4
if 'visualization' in widget:
data['visualization_id'] = meta['visualizations'].get(
str(widget['visualization']['id']))
# To remain backward compatible table visualizations need to set autoHeight...
# if data['visualization_id'] in non_tables:
# widget['options']['position']['autoHeight'] = True
if 'visualization' in widget and not data['visualization_id']:
print 'skipping for missing viz'
continue
response = requests.post(
DESTINATION + '/api/widgets', json=data, headers=auth_headers(user['api_key']))
response.raise_for_status()
def save_meta():
print "Saving meta..."
with open('meta.json', 'w') as f:
json.dump(meta, f)
def import_all():
try:
# If you had an issue while running this the first time, fixed it and running again, uncomment the following to skip content already imported.
with open('meta.json') as f:
meta.update(json.load(f))
import_users()
import_queries()
import_visualizations()
import_dashboards()
except Exception as ex:
logging.exception(ex)
save_meta()
if __name__ == '__main__':
import_all()
@egold
Copy link

egold commented Sep 21, 2020

I believe if you change all of the print "foo" to print("foo") this script becomes python3 compatible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment