Skip to content

Instantly share code, notes, and snippets.

@mangecoeur
Last active March 30, 2021 21:34
Show Gist options
  • Star 19 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save mangecoeur/1fbd63d4758c2ba0c470 to your computer and use it in GitHub Desktop.
Save mangecoeur/1fbd63d4758c2ba0c470 to your computer and use it in GitHub Desktop.
Pandas PostgresSQL support for loading to DB using fast COPY FROM method

This small subclass of the Pandas sqlalchemy-based SQL support for reading/storing tables uses the Postgres-specific "COPY FROM" method to insert large amounts of data to the database. It is much faster that using INSERT. To acheive this, the table is created in the normal way using sqlalchemy but no data is inserted. Instead the data is saved to a temporary CSV file (using Pandas' mature CSV support) then read back to Postgres using Psychopg2 support for COPY FROM STDIN.

import tempfile
import pandas.io.sql
class PgSQLDatabase(pandas.io.sql.SQLDatabase):
# FIXME Schema is pulled from Meta object, shouldn't actually be part of signature!
def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None, schema=None, chunksize=None, dtype=None, pk=None):
"""
Write records stored in a DataFrame to a SQL database.
Parameters
----------
frame : DataFrame
name : string
Name of SQL table
if_exists : {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
index : boolean, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Column label for index column(s). If None is given (default) and
`index` is True, then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
schema : string, default None
Name of SQL schema in database to write to (if database flavor
supports this). If specified, this overwrites the default
schema of the SQLDatabase object.
chunksize : int, default None
If not None, then rows will be written in batches of this size at a
time. If None, all rows will be written at once.
dtype : dict of column name to SQL type, default None
Optional specifying the datatype for columns. The SQL type should
be a SQLAlchemy type.
pk: name of column(s) to set as primary keys
"""
if dtype is not None:
import sqlalchemy.sql.type_api as type_api
for col, my_type in dtype.items():
if not issubclass(my_type, type_api.TypeEngine):
raise ValueError('The type of %s is not a SQLAlchemy '
'type ' % col)
table = pandas.io.sql.SQLTable(name, self, frame=frame, index=index,
if_exists=if_exists, index_label=index_label,
schema=self.meta.schema, dtype=dtype)
table.create()
if pk is not None:
if isinstance(pk, str):
pks = pk
else:
pks = ", ".join(pk)
sql = "ALTER TABLE {schema_name}.{table_name} ADD PRIMARY KEY ({pks})".format(schema_name=self.meta.schema, table_name=name, pks=pks)
self.execute(sql)
# Some tricks needed here:
# Need to explicitly keep reference to connection
# Need to "open" temp file seperately in write and read mode
# Otherwise data does not get loaded
conn = self.engine.raw_connection()
with conn.cursor() as cur, tempfile.NamedTemporaryFile(mode='w') as temp_file:
frame.to_csv(temp_file, index=index)
with open(temp_file.name, 'r') as f:
sql = "COPY {schema_name}.{table_name} FROM STDIN WITH (FORMAT CSV, HEADER TRUE)".format(
schema_name=self.meta.schema, table_name=name)
cur.copy_expert(sql, f)
conn.commit()
# check for potentially case sensitivity issues (GH7815)
self.meta.reflect()
if name not in self.engine.table_names(schema=schema or self.meta.schema):
warnings.warn("The provided table name '{0}' is not found exactly "
"as such in the database after writing the table, "
"possibly due to case sensitivity issues. Consider "
"using lower case table names.".format(name), UserWarning)
@johnistan
Copy link

With 9.4, you can try the FREEZE option for COPY. bypasses transaction system.

@vamshing
Copy link

Hi,
I get the error:
ProgrammingError: schema "none" does not exist
Any thoughts ?

@kojima-rhd
Copy link

i got this error when i run the script.

PgSQLDatabase(engine).to_sql(data_frame, 'test', schema='source', if_exists='append', index=False)
=> IOError: [Errno 13] Permission denied: 'c:\users\kojima~1\appdata\local\temp\tmpyzzsyj'

it seems that this block is not working well.

    with conn.cursor() as cur, tempfile.NamedTemporaryFile(mode='w') as temp_file:
        frame.to_csv(temp_file, index=index)

@kojima-rhd
Copy link

i found that we can re-write this script by using StringIO instead of using tempfile

@mritchie712
Copy link

mritchie712 commented Jul 13, 2016

I had some schema and engine errors, this fixed it

import tempfile
import pandas.io.sql

class PgSQLDatabase(pandas.io.sql.SQLDatabase):
    def to_sql(self, frame, name, if_exists='fail', index=True,
               index_label=None, schema=None, chunksize=None, dtype=None, pk=None):
        if dtype is not None:
            import sqlalchemy.sql.type_api as type_api

            for col, my_type in dtype.items():
                if not issubclass(my_type, type_api.TypeEngine):
                    raise ValueError('The type of %s is not a SQLAlchemy '
                                     'type ' % col)

        table = pandas.io.sql.SQLTable(name, self, frame=frame, index=index,
                                       if_exists=if_exists, index_label=index_label,
                                       schema=self.meta.schema, dtype=dtype)
        table.create()
        if pk is not None:
            if isinstance(pk, str):
                pks = pk
            else:
                pks = ", ".join(pk)
            sql = "ALTER TABLE {schema_name}.{table_name} ADD PRIMARY KEY ({pks})".format(schema_name=schema, table_name=name, pks=pks)
            self.execute(sql)

        # Some tricks needed here:
        # Need to explicitly keep reference to connection
        # Need to "open" temp file seperately in write and read mode
        # Otherwise data does not get loaded
        conn = engine.raw_connection()
        with conn.cursor() as cur, tempfile.NamedTemporaryFile(mode='w') as temp_file:
            frame.to_csv(temp_file, index=index)

            with open(temp_file.name, 'r') as f:
                sql = "COPY {schema_name}.{table_name} FROM STDIN WITH (FORMAT CSV, HEADER TRUE)".format(
                    schema_name=schema, table_name=name)
                cur.copy_expert(sql, f)
        conn.commit()

        # check for potentially case sensitivity issues (GH7815)
        self.meta.reflect()
        if name not in engine.table_names(schema=schema or self.meta.schema):
            warnings.warn("The provided table name '{0}' is not found exactly "
                          "as such in the database after writing the table, "
                          "possibly due to case sensitivity issues. Consider "
                          "using lower case table names.".format(name), UserWarning)

@jspeis
Copy link

jspeis commented Feb 15, 2017

This was a helpful gist, thanks! One thing I found is that instead of re-opening the file the following approach worked for me:

        with conn.cursor() as cur, tempfile.NamedTemporaryFile(mode='w+') as temp_file:
            df.to_csv(temp_file, index=index)
            temp_file.seek(0)
...
            cur.copy_expert(sql, temp_file)

@pulkitpahwa
Copy link

what should be the type of meta.schema ??

@pulkitpahwa
Copy link

Can you please share a sample schema ?

@ldacey
Copy link

ldacey commented May 5, 2017

I've been using this custom code:

def create_file_object(df, file_path=None, string_io=True):
    """Creates a csv file or writes to memory"""
    if string_io:
        s_buf = StringIO()
        df.to_csv(s_buf, index=False)
        s_buf.seek(0)
        file_object = s_buf
    else:
        df.to_csv(file_path, index=False)
        df = open(file_path)
        file_object = df
    return file_object


def load_to_database(table, unique_columns, file_object, header=True):
    fake_conn = sqlalchemy.create_engine(engine).raw_connection()
    fake_cur = fake_conn.cursor()
    if header:
        columns = ', '.join([f'{col}' for col in unique_columns])
        sql = f'COPY {table} ({columns}) FROM STDIN WITH CSV HEADER'
        fake_cur.copy_expert(sql=sql, file=file_object)
    else:
        columns = ', '.join([f'{col}' for col in unique_columns])
        sql = f'COPY {table} ({columns}) FROM STDIN WITH CSV'
        fake_cur.copy_expert(sql=sql, file=file_object)
    fake_conn.commit()
    fake_cur.close()
    del fake_cur
    fake_conn.close()

The reason I have the list comprehension is because I am often loading a set of columns to a table which has a postgres SERIAL column as the primary key. This has been working, but I would rather import something that someone else has made better. The StringIO method works perfectly fine though and I have been using it consistently.

@ljstrnadiii
Copy link

@jspeis,

Is there any reason why you chose to use tempfile instead of StringIO?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment