Last active
March 9, 2023 17:22
-
-
Save saul-data/7be339ac51118bb820e719454801f880 to your computer and use it in GitHub Desktop.
Fast load data into Postgresql with Dataplane
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import os | |
from io import StringIO | |
import pandas as pd | |
# Instructions for Dataplane data pipelines - https://dataplane.app | |
# Database credentials from Dataplane Secrets | |
pgUser = os.getenv("dp_secret_pg_user") | |
pgPassword = os.getenv("dp_secret_pg_password") | |
pgHost = "127.0.0.1" | |
pgPort = "5432" | |
pgDatabase = "mydb" | |
print('read file') | |
# https://www.gov.uk/guidance/about-the-price-paid-data#download-options | |
column_names = [ | |
'id', | |
'price', | |
'transfer_date', | |
'postcode', | |
'property_type', | |
'old_new', | |
'duration', | |
'primary_address_obj', | |
'secondary_address_obj', | |
'street', | |
'locality', | |
'city_town', | |
'district', | |
'county', | |
'ppd_cat', | |
'record_stat' | |
] | |
df = pd.read_csv('./propdata/pp-complete.csv', header=None, names=column_names) | |
print(df.head()) | |
connectstring = "postgresql://{pgUser}:{pgPassword}@:{pgPort}/{pgDatabase}?sslmode=disable" | |
table = "mytable" | |
df.to_sql(table, connectstring, method=psql_insert_copy, parallel=True, if_exists='append') | |
def psql_insert_copy(table, conn, keys, data_iter): | |
""" | |
Execute SQL statement inserting data | |
Parameters | |
---------- | |
table : pandas.io.sql.SQLTable | |
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection | |
keys : list of str | |
Column names | |
data_iter : Iterable that iterates the values to be inserted | |
""" | |
# gets a DBAPI connection that can provide a cursor | |
dbapi_conn = conn.connection | |
with dbapi_conn.cursor() as cur: | |
s_buf = StringIO() | |
writer = csv.writer(s_buf) | |
writer.writerows(data_iter) | |
s_buf.seek(0) | |
columns = ', '.join('"{}"'.format(k) for k in keys) | |
if table.schema: | |
table_name = '{}.{}'.format(table.schema, table.name) | |
else: | |
table_name = table.name | |
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format( | |
table_name, columns) | |
cur.copy_expert(sql=sql, file=s_buf) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment