Skip to content

Instantly share code, notes, and snippets.

@mrbungie
Last active January 19, 2017 12:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrbungie/5fcc076e67e047c8a61de706403b1138 to your computer and use it in GitHub Desktop.
Save mrbungie/5fcc076e67e047c8a61de706403b1138 to your computer and use it in GitHub Desktop.
import sys
import json
import logging
from redash.query_runner import *
from redash.utils import JSONEncoder
logger = logging.getLogger(__name__)
types_map = {
'str': TYPE_STRING,
'int': TYPE_INTEGER,
'Decimal': TYPE_FLOAT,
'date': TYPE_DATE,
'datetime': TYPE_DATETIME,
'float': TYPE_FLOAT
}
class CustomJSONEncoder(JSONEncoder):
"""Custom JSON encoding class, to handle PyTD's Decimal NaNs and Infinites."""
def default(self, o):
import decimal
if isinstance(o, decimal.Decimal):
if o.is_finite():
return float(o)
else:
return "null"
super(CustomJSONEncoder, self).default(o)
class Teradata(BaseQueryRunner):
noop_query = "SELECT 1"
@classmethod
def configuration_schema(cls):
return {
'type': 'object',
'properties': {
'host': {
'type': 'string',
'default': '127.0.0.1'
},
'user': {
'type': 'string'
},
'passwd': {
'type': 'string',
'title': 'Password'
}
},
'required': ['host','user','passwd'],
'secret': ['passwd']
}
@classmethod
def name(cls):
return "Teradata"
@classmethod
def enabled(cls):
try:
import teradata
except ImportError:
return False
return True
def _get_tables(self, schema):
query = """
SELECT DatabaseName as table_schema, TableName as table_name, ColumnName as column_name
FROM DBC.Columns
WHERE DatabaseName NOT IN ('SYSUDTLIB','DBC')
"""
results, error = self.run_query(query, None)
if error is not None:
raise Exception("Failed getting schema.")
results = json.loads(results)
for row in results['rows']:
if row['table_schema'] != 'public':
table_name = '{}.{}'.format(row['table_schema'], row['table_name'])
else:
table_name = row['table_name']
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
schema[table_name]['columns'].append(row['column_name'])
return schema.values()
def run_query(self, query, user):
import teradata
connection = None
try:
udaExec = teradata.UdaExec(appName="redash", version="1.0",logConsole=False)
connection = udaExec.connect(method="odbc",
system=self.configuration.get('host', ''),
username=self.configuration.get('user', ''),
password=self.configuration.get('passwd', ''),
charset="UTF16")
cursor = connection.cursor()
logger.debug("Teradata running query: %s", query)
cursor.execute(query)
data = cursor.fetchall()
# TODO - very similar to pg.py
if cursor.description is not None:
columns = self.fetch_columns([(i[0], types_map.get(i[1].__name__, None)) for i in cursor.description])
rows = [dict(zip((c['name'] for c in columns), row)) for row in data]
data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=CustomJSONEncoder)
error = None
else:
json_data = None
error = "No data was returned."
cursor.close()
except teradata.Error as e:
json_data = None
error = e.args[1]
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
except Exception as e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
if connection:
connection.close()
return json_data, error
register(Teradata)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment