Skip to content

Instantly share code, notes, and snippets.

@p5k6
Created May 2, 2019 18:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save p5k6/f2181270a3a16fa46097f76686cd6cfa to your computer and use it in GitHub Desktop.
Save p5k6/f2181270a3a16fa46097f76686cd6cfa to your computer and use it in GitHub Desktop.
Amundsen Postgres version of a metadata extractor. Only tested on 9.6, on local install. No tests provided at this point
import logging
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
LOGGER = logging.getLogger(__name__)
class PostgresTableMetadataExtractor(Extractor):
"""
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
# SELECT statement from postgres information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
c.table_catalog as cluster, c.table_schema as schema_name, c.table_name as name, null as description, c.column_name as col_name, c.data_type as col_type, null as col_description, ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
{where_clause_suffix}
ORDER by cluster, schema_name, name, col_sort_order ;
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'master'
DEFAULT_CONFIG = ConfigFactory.from_dict({WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: 'master'})
def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(PostgresTableMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(PostgresTableMetadataExtractor.CLUSTER_KEY))
self.sql_stmt = PostgresTableMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(PostgresTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY))
LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self):
# type: () -> str
return 'extractor.postgres_table_metadata'
def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))
yield TableMetadata('postgres', last_row['cluster'],
last_row['schema_name'],
last_row['name'],
last_row['description'],
columns)
def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name'])
return None
@feng-tao
Copy link

feng-tao commented May 7, 2019

hey @p5k6, saw you created this gist for postgres extractor. I think it would be a goo addition to Amundsen databuilder. Do you think you have time to create an Amundsen databuilder pr with unit test for this extractor?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment