Skip to content

Instantly share code, notes, and snippets.

@saul-data
Last active March 9, 2023 17:22
Show Gist options
  • Save saul-data/7be339ac51118bb820e719454801f880 to your computer and use it in GitHub Desktop.
Save saul-data/7be339ac51118bb820e719454801f880 to your computer and use it in GitHub Desktop.
Fast load data into Postgresql with Dataplane
import csv
import os
from io import StringIO
import pandas as pd
# Instructions for Dataplane data pipelines - https://dataplane.app
# Database credentials from Dataplane Secrets
pgUser = os.getenv("dp_secret_pg_user")
pgPassword = os.getenv("dp_secret_pg_password")
pgHost = "127.0.0.1"
pgPort = "5432"
pgDatabase = "mydb"
print('read file')
# https://www.gov.uk/guidance/about-the-price-paid-data#download-options
column_names = [
'id',
'price',
'transfer_date',
'postcode',
'property_type',
'old_new',
'duration',
'primary_address_obj',
'secondary_address_obj',
'street',
'locality',
'city_town',
'district',
'county',
'ppd_cat',
'record_stat'
]
df = pd.read_csv('./propdata/pp-complete.csv', header=None, names=column_names)
print(df.head())
connectstring = "postgresql://{pgUser}:{pgPassword}@:{pgPort}/{pgDatabase}?sslmode=disable"
table = "mytable"
df.to_sql(table, connectstring, method=psql_insert_copy, parallel=True, if_exists='append')
def psql_insert_copy(table, conn, keys, data_iter):
"""
Execute SQL statement inserting data
Parameters
----------
table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment