Last active
December 22, 2021 23:42
-
-
Save xflr6/33db779acd7fe79faadeb0f28fc053bd to your computer and use it in GitHub Desktop.
SQL injection safe dynamic query execution via PL/pgSQL quote_ident() and format('%I')
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""SQL-injection safe dynamic query with pl/pgsql.""" | |
import sqlalchemy as sa | |
UNIQUE_NULL = [('contributioncontributor', ['contribution_pk', 'contributor_pk'], []), | |
('contributionreference', ['contribution_pk', 'source_pk', 'description'], []), | |
('editor', ['dataset_pk', 'contributor_pk'], []), | |
('languageidentifier', ['language_pk', 'identifier_pk'], []), | |
('languagesource', ['language_pk', 'source_pk'], []), | |
('sentencereference', ['sentence_pk', 'source_pk', 'description'], []), | |
('unitvalue', ['unit_pk', 'unitparameter_pk', 'unitdomainelement_pk'], ['unitdomainelement_pk']), | |
('value', ['valueset_pk', 'domainelement_pk'], ['domainelement_pk']), | |
('valuesentence', ['value_pk', 'sentence_pk'], []), | |
('valueset', ['language_pk', 'parameter_pk', 'contribution_pk'], []), | |
('valuesetreference', ['valueset_pk', 'source_pk', 'description'], [])] | |
UNIQUE = [(tab, cols) for tab, cols, nullable in UNIQUE_NULL] | |
NOTNULL = [(tab, [c for c in cols if c not in nullable]) | |
for tab, cols, nullable in UNIQUE_NULL] | |
engine = sa.create_engine('postgresql://postgres@/glottolog2.7') | |
engine.execute(''' | |
CREATE OR REPLACE FUNCTION | |
pg_temp.count_null("table" regclass, columns name[], OUT n int) AS $$ | |
DECLARE | |
where_clause text; | |
BEGIN | |
SELECT string_agg(format('%%I IS NULL', c), ' OR ') FROM unnest(columns) AS c INTO where_clause; | |
EXECUTE format( | |
'SELECT count(*) FROM %%I WHERE %%s', | |
"table", where_clause) INTO n; | |
END; | |
$$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION | |
pg_temp.count_duplicates("table" regclass, columns name[], OUT n int) AS $$ | |
DECLARE | |
group_by text; | |
BEGIN | |
SELECT string_agg(quote_ident(c), ', ') FROM unnest(columns) AS c INTO group_by; | |
EXECUTE format( | |
'SELECT count(*) FROM (SELECT 1 FROM %%I GROUP BY %%s HAVING count(*) > 1) AS sq', | |
"table", group_by) INTO n; | |
END; | |
$$ LANGUAGE plpgsql''') | |
for tab, cols in NOTNULL: | |
n = engine.scalar(sa.func.pg_temp.count_null(tab, cols)) | |
print(tab, n, sep='\t') | |
for tab, cols in UNIQUE: | |
n = engine.scalar(sa.func.pg_temp.count_duplicates(tab, cols)) | |
print(tab, n, sep='\t') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment