-
-
Save susodapop/393cc3001519326f987f386da305ae7f to your computer and use it in GitHub Desktop.
Redash GDPR Lookup Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import click | |
from redash import Redash | |
class Lookup(object): | |
def __init__(self, redash, email): | |
self.email = email.lower() | |
self.redash = redash | |
def check_query_result(self, query_result_id): | |
if not query_result_id: | |
return False | |
result = self.redash._get('api/query_results/{}'.format(query_result_id)) | |
return self.email in result.text.lower() | |
def check_query(self, query): | |
found_in_query = False | |
for field in ('query', 'name', 'description'): | |
if self.email in (query[field] or '').lower(): | |
found_in_query = True | |
for tag in query['tags']: | |
if self.email in (tag or '').lower(): | |
found_in_query = True | |
found_in_result = self.check_query_result(query['latest_query_data_id']) | |
return found_in_query or found_in_result | |
def check_dashboard(self, dashboard): | |
found_in_dashboard = False | |
found_in_widget = False | |
for field in ('slug', 'name'): | |
if self.email in (dashboard[field] or '').lower(): | |
found_in_dashboard = True | |
for tag in dashboard['tags']: | |
if self.email in (tag or '').lower(): | |
found_in_dashboard = True | |
if not found_in_dashboard: | |
dash_widgets = self.redash._get('api/dashboards/{}'.format(dashboard['slug'])).json()['widgets'] | |
# Check text widgets | |
if not isinstance(dash_widgets, type(None)): | |
for widget in dash_widgets: | |
if 'visualization' not in widget and self.email in widget['text']: | |
found_in_widget = True | |
return found_in_dashboard or found_in_widget | |
def lookup(self): | |
queries = self.redash.paginate(self.redash.queries) | |
with click.progressbar(queries, label="Queries") as bar: | |
found_q = [query for query in bar if self.check_query(query)] | |
for query in found_q: | |
query_url = '{}/queries/{}'.format(self.redash.redash_url, query['id']) | |
print(query_url) | |
dashboards = self.redash.paginate(self.redash.dashboards) | |
with click.progressbar(dashboards, label="Dashboards") as bar: | |
found_d = [dash for dash in bar if self.check_dashboard(dash)] | |
for dash in found_d: | |
dash_url = '{}/dashboards/{}'.format(self.redash.redash_url, dash['slug']) | |
print(dash_url) | |
@click.command() | |
@click.argument("redash_host") | |
@click.argument("email") | |
@click.option( | |
"--api-key", | |
"api_key", | |
envvar="REDASH_API_KEY", | |
show_envvar=True, | |
prompt="API Key", | |
help="User API Key", | |
) | |
def lookup(redash_host, email, api_key): | |
"""Search for EMAIL in queries and query results, output query URL if found.""" | |
redash = Redash(redash_host, api_key) | |
lookup = Lookup(redash, email) | |
lookup.lookup() | |
if __name__ == "__main__": | |
lookup() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
class Redash(object): | |
def __init__(self, redash_url, api_key): | |
self.redash_url = redash_url | |
self.session = requests.Session() | |
self.session.headers.update({'Authorization': 'Key {}'.format(api_key)}) | |
def test_credentials(self): | |
try: | |
response = self._get('api/session') | |
return True | |
except requests.exceptions.HTTPError: | |
return False | |
def queries(self, page=1, page_size=25): | |
"""GET api/queries""" | |
return self._get('api/queries', params=dict(page=page, page_size=page_size)).json() | |
def dashboards(self, page=1, page_size=25): | |
"""GET api/dashboards""" | |
return self._get('api/dashboards', params=dict(page=page, page_size=page_size)).json() | |
def dashboard(self, slug): | |
"""GET api/dashboards/{slug}""" | |
return self._get('api/dashboards/{}'.format(slug)).json() | |
def create_dashboard(self, name): | |
return self._post('api/dashboards', json={'name': name}).json() | |
def update_dashboard(self, dashboard_id, properties): | |
return self._post('api/dashboards/{}'.format(dashboard_id), json=properties).json() | |
def create_widget(self, dashboard_id, visualization_id, text, options): | |
data = { | |
'dashboard_id': dashboard_id, | |
'visualization_id': visualization_id, | |
'text': text, | |
'options': options, | |
'width': 1, | |
} | |
return self._post('api/widgets', json=data) | |
def duplicate_dashboard(self, slug, new_name=None): | |
current_dashboard = self.dashboard(slug) | |
if new_name is None: | |
new_name = u'Copy of: {}'.format(current_dashboard['name']) | |
new_dashboard = self.create_dashboard(new_name) | |
if current_dashboard['tags']: | |
self.update_dashboard(new_dashboard['id'], {'tags': current_dashboard['tags']}) | |
for widget in current_dashboard['widgets']: | |
visualization_id = None | |
if 'visualization' in widget: | |
visualization_id = widget['visualization']['id'] | |
self.create_widget(new_dashboard['id'], visualization_id, widget['text'], widget['options']) | |
return new_dashboard | |
def scheduled_queries(self): | |
"""Loads all queries and returns only the scheduled ones.""" | |
queries = self.paginate(self.queries) | |
return filter(lambda query: query['schedule'] is not None, queries) | |
def update_query(self, query_id, data): | |
"""POST /api/queries/{query_id} with the provided data object.""" | |
path = 'api/queries/{}'.format(query_id) | |
return self._post(path, json=data) | |
def paginate(self, resource): | |
"""Load all items of a paginated resource""" | |
stop_loading = False | |
page = 1 | |
page_size = 100 | |
items = [] | |
while not stop_loading: | |
response = resource(page=page, page_size=page_size) | |
items += response['results'] | |
page += 1 | |
stop_loading = response['page'] * response['page_size'] >= response['count'] | |
return items | |
def _get(self, path, **kwargs): | |
return self._request('GET', path, **kwargs) | |
def _post(self, path, **kwargs): | |
return self._request('POST', path, **kwargs) | |
def _request(self, method, path, **kwargs): | |
url = '{}/{}'.format(self.redash_url, path) | |
response = self.session.request(method, url, **kwargs) | |
response.raise_for_status() | |
return response | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment