Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# This query runner is licensed under the BSD 2-Clause "Simplified" License
#
# Copyright (c) 2013-2021, Arik Fraimovich.
# All rights reserved.
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or
# other materials provided with the distribution.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS
# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
from io import StringIO
import csv
import requests
from redash.query_runner import TYPE_STRING, BaseQueryRunner, register
from redash.utils import json_dumps
class CsvUrl(BaseQueryRunner):
should_annotate_query = False
@classmethod
def name(cls):
return "CSV (from URL)"
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"username": {"title": "Username (Basic Auth)", "type": "string"},
"password": {"title": "Password (Basic Auth)", "type": "string"},
},
"secret": ["password"],
}
def test_connection(self):
pass
def run_query(self, query, user):
try:
error = None
url = query.strip()
if "username" in self.configuration and self.configuration["username"]:
auth = (self.configuration["username"], self.configuration["password"])
else:
auth = None
response = requests.get(url, auth=auth)
response.raise_for_status()
raw_data = StringIO(response.text)
reader = csv.DictReader(raw_data)
columns = []
rows = []
for row in reader:
if not columns:
columns = [
{"name": k, "friendly_name": k, "type": TYPE_STRING}
for k in row.keys()
]
rows.append({k: v for k, v in row.items()})
json_data = json_dumps({"columns": columns, "rows": rows})
return json_data, error
except requests.RequestException as e:
return None, str(e)
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
return json_data, error
register(CsvUrl)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment