Skip to content

Instantly share code, notes, and snippets.

@xflr6
Last active December 22, 2021 23:42
Show Gist options
  • Save xflr6/33db779acd7fe79faadeb0f28fc053bd to your computer and use it in GitHub Desktop.
Save xflr6/33db779acd7fe79faadeb0f28fc053bd to your computer and use it in GitHub Desktop.
SQL injection safe dynamic query execution via PL/pgSQL quote_ident() and format('%I')
"""SQL-injection safe dynamic query with pl/pgsql."""
import sqlalchemy as sa
UNIQUE_NULL = [('contributioncontributor', ['contribution_pk', 'contributor_pk'], []),
('contributionreference', ['contribution_pk', 'source_pk', 'description'], []),
('editor', ['dataset_pk', 'contributor_pk'], []),
('languageidentifier', ['language_pk', 'identifier_pk'], []),
('languagesource', ['language_pk', 'source_pk'], []),
('sentencereference', ['sentence_pk', 'source_pk', 'description'], []),
('unitvalue', ['unit_pk', 'unitparameter_pk', 'unitdomainelement_pk'], ['unitdomainelement_pk']),
('value', ['valueset_pk', 'domainelement_pk'], ['domainelement_pk']),
('valuesentence', ['value_pk', 'sentence_pk'], []),
('valueset', ['language_pk', 'parameter_pk', 'contribution_pk'], []),
('valuesetreference', ['valueset_pk', 'source_pk', 'description'], [])]
UNIQUE = [(tab, cols) for tab, cols, nullable in UNIQUE_NULL]
NOTNULL = [(tab, [c for c in cols if c not in nullable])
for tab, cols, nullable in UNIQUE_NULL]
engine = sa.create_engine('postgresql://postgres@/glottolog2.7')
engine.execute('''
CREATE OR REPLACE FUNCTION
pg_temp.count_null("table" regclass, columns name[], OUT n int) AS $$
DECLARE
where_clause text;
BEGIN
SELECT string_agg(format('%%I IS NULL', c), ' OR ') FROM unnest(columns) AS c INTO where_clause;
EXECUTE format(
'SELECT count(*) FROM %%I WHERE %%s',
"table", where_clause) INTO n;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION
pg_temp.count_duplicates("table" regclass, columns name[], OUT n int) AS $$
DECLARE
group_by text;
BEGIN
SELECT string_agg(quote_ident(c), ', ') FROM unnest(columns) AS c INTO group_by;
EXECUTE format(
'SELECT count(*) FROM (SELECT 1 FROM %%I GROUP BY %%s HAVING count(*) > 1) AS sq',
"table", group_by) INTO n;
END;
$$ LANGUAGE plpgsql''')
for tab, cols in NOTNULL:
n = engine.scalar(sa.func.pg_temp.count_null(tab, cols))
print(tab, n, sep='\t')
for tab, cols in UNIQUE:
n = engine.scalar(sa.func.pg_temp.count_duplicates(tab, cols))
print(tab, n, sep='\t')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment