Last active
November 27, 2016 23:20
-
-
Save jachym/77e8eb02cea95744fcd3c4f0bef7d270 to your computer and use it in GitHub Desktop.
http://clarkdave.net/2015/02/historical-records-with-postgresql-and-temporal-tables-and-sql-2011/Update temporal data produced by LINZ, to PostgreSQL/PostGIS using temporal_table extension as described http://clarkdave.net/2015/02/historical-records-with-postgresql-and-temporal-tables-and-sql-2011/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Script for updating data produced from LINZ | |
works on local 'linz' database | |
python3 update_temporal_table.py --csv ~/Data/linz/layer-804-changeset.csv --name nz-property-titles | |
nz-property-titles will be converted to schema and table name nz_property_titles.nz_property_titles | |
""" | |
import argparse | |
import csv | |
import psycopg2 | |
from tabulate import tabulate | |
DATABASE='linz' | |
def get_data(from_file): | |
"""Import data from csv file | |
""" | |
reader = csv.DictReader(from_file) | |
rows = [] | |
# TODO: should return just reader, not to store data into memory | |
for row in reader: | |
rows.append(row) | |
return rows | |
def get_table_schema(cur, name): | |
'''Get table schema | |
''' | |
name = name.replace('-','_') | |
cur.execute(""" | |
SELECT column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS | |
where table_name = '{}' and table_schema = '{}'""".format(name, name)) | |
columns = cur.fetchall() | |
return dict(columns) | |
def get_keys_values(row, columns): | |
keys = [] | |
values = [] | |
for col in columns: | |
# make some data adjustments | |
if col in ['sys_period', 'fid', 'shape']: | |
continue | |
value = row[col] | |
if col == 'spatial_extents_shared': | |
if value == 'false': | |
value = 'F' | |
else: | |
value = 'T' | |
values.append(value) | |
keys.append(col) | |
return (keys, values) | |
def __insert(row, name, columns): | |
'''Insert new record | |
''' | |
(keys, data) = get_keys_values(row, columns) | |
keys.append('shape') | |
keys_list = ', '.join(keys) | |
query = 'INSERT INTO {}.{} (fid, {}) VALUES (%s, {}'.format(name, name, keys_list, | |
", ".join(["%s"]*len(data))) | |
data = [row['FID'].split('.')[1]] + data | |
if row['shape']: | |
data.append(row['shape']) | |
query += ', ST_GeomFromText(%s, 2193))' | |
return(query, data) | |
def __update(row, name, columns): | |
'''Insert new record | |
''' | |
(keys, data) = get_keys_values(row, columns) | |
query = 'UPDATE {}.{} SET '.format(name, name) | |
query_data = [] | |
query_data = map(lambda k: '{} = %s'.format(k), keys) | |
query += ", ".join(query_data) | |
if row['shape']: | |
query += ', shape = ST_GeomFromText(%s, 2193) ' | |
data.append(row['shape']) | |
data.append(row['FID'].split('.')[1]) | |
query += 'WHERE fid = %s' | |
return (query, data) | |
def __delete(row, name, columns): | |
'''delete existing record | |
''' | |
fid = row['FID'].split('.')[1] | |
query = 'DELETE FROM {}.{} WHERE fid = %s'.format(name, name) | |
return (query, [fid]) | |
def save_to_db(data, name): | |
"""Save data to LINZ database | |
""" | |
inserts = [] | |
updates = [] | |
deletes = [] | |
name = name.replace('-', '_') | |
with psycopg2.connect("dbname=linz") as conn: | |
with conn.cursor() as cur: | |
columns = get_table_schema(cur, name) | |
for row in data: | |
query = None | |
query_data = None | |
if row['__change__'] == 'INSERT': | |
(query, query_data) = __insert(row, name, columns) | |
inserts.append((query, query_data)) | |
if row['__change__'] == 'UPDATE': | |
(query, query_data) = __update(row, name, columns) | |
updates.append((query, query_data)) | |
if row['__change__'] == 'DELETE': | |
(query, query_data) = __delete(row, name, columns) | |
deletes.append((query, query_data)) | |
if query and query_data: | |
cur.execute(query, query_data) | |
conn.commit() | |
table = [[len(inserts), len(updates), len(deletes)]] | |
headers = ['INSERT', 'UPDATE', 'DELETE'] | |
print(tabulate(table, headers, tablefmt="grid")) | |
def main(): | |
"""Main function, parse args and run what needs to be runing | |
""" | |
parser = argparse.ArgumentParser(description='Update LINZ data') | |
parser.add_argument('--csv', metavar='FILE', type=argparse.FileType('r'), | |
required=True, | |
help='existing CSV file with update data') | |
parser.add_argument('--name', | |
required=True, | |
help='Table and schema name, which data should be updated') | |
args = parser.parse_args() | |
data = get_data(args.csv) | |
save_to_db(data, args.name) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment