Skip to content

Instantly share code, notes, and snippets.

@D2theR
Last active February 7, 2020 00:58
Show Gist options
  • Save D2theR/63c0c6906203a36e67bc95e51658714f to your computer and use it in GitHub Desktop.
Save D2theR/63c0c6906203a36e67bc95e51658714f to your computer and use it in GitHub Desktop.
Dump Access database to PostgreSQL.
#!/usr/bin/env python
#
# A simple script to dump the contents of
# Microsoft Access Database tables to CSV & then to a PostgreSQL Database
# It depends upon the mdbtools suite:
# http://sourceforge.net/projects/mdbtools/
# Can be installed in most Debian based Linux distros with:
#
# sudo apt install mdbtools
#
# Script use is like:
# python CSV_ACCESS_DUMP.py /path/to/MyAccessDB.accdb
"""
1. Export user-defined list of "tables" to CSV using Mdb-tools external Python command with an .accdb file as argument.
2. Open file in Pandas and parse Nulls, Dates, Omit Columns and resave as new CSV (timestamped file)
3. Parse SQL statements:
a. SET buffers to increase effiency from large tables and create and index
b. Create a temporary table from the table that data is being "COPY" commanded from.
c. Create an Index on the temp table and analysis is to speed on inserts
d. COPY data from the CSV file to the temp(tmp) table
e. Issue an "UPSERT" command or in Postgres' case it will try to INSERT, SELECTed data FROM temp table,
and ON CONFLICT of (user defined constraint fields) it will UPDATE the fields instead.
-- PostgresSQL info for this script --
NOTE: All the following database transactions have to be completed in ONE cursor session or they will fail!
### RECOMMENDATIONS ###
For Big tables If the import-table is big it may pay to increase temp_buffers temporarily for the session (first thing in the session):
SET temp_buffers = '500MB'; -- example value
Add an index to the temporary table:
CREATE INDEX tmp_x_id_idx ON tmp_x(id);
Run ANALYZE manually, since temporary tables are not covered by autovacuum / auto-analyze.
ANALYZE tmp_x;
If the imported table matches the table to be updated exactly, this may be convenient as it
creates an empty temporary table matching the structure of the existing table, without constraints.
CREATE TEMP TABLE tmp_tblCustomerOrderSUB AS SELECT * FROM tblCustomerOrderSUB LIMIT 0;
COPY tmp_tblCustomerOrderSUB FROM '/absolute/path/to/file' (DELIMITER "|", HEADER TRUE, FORMAT CSV);
INSERT INTO "tblname" (fields)
SELECT {fields}
FROM "tmp_tblname"
ON CONFLICT (conflict_fields) DO UPDATE
SET field_name = excluded.field_name;
DROP TABLE tmp_tblCustomerOrderSUB; -- else it is dropped at end of session automatically
"""
import sys, subprocess, os, re
import psycopg2
import pandas as pd
from datetime import datetime
# Database file specified from command line argument
DATABASE = sys.argv[1]
def pg_connect(csv_file, tblname, headers):
connection = psycopg2.connect(user="postgres",
password="postgres",
host="127.0.0.1",
port="5432",
database="postgres")
c = connection.cursor()
if csv_file != "CLOSE_CONNECTION":
try:
print(tblname)
# Increase buffer size to speed up transactions
buffer_query = """SET temp_buffers = '50MB';"""
c.execute(buffer_query)
# Create temporary table
temp_tbl_query = f"""CREATE TEMP TABLE "tmp_{tblname}" AS SELECT * FROM "{tblname}" LIMIT 0;"""
c.execute(temp_tbl_query)
# Create an index
index_query = f"""CREATE INDEX "tmp_{tblname}_id_idx" ON "tmp_{tblname}"(id);"""
c.execute(index_query)
# Analyze the table to speed up inserts
analyze_query = f"""ANALYZE "tmp_{tblname}";"""
# Parse fields from Panda's array and copy data from the CSV file to the temp table
copy_fields = ','.join([f'"{col}"' for col in headers])
copy_query = f"""COPY "tmp_{tblname}" ({copy_fields}) FROM '{csv_file}' (DELIMITER "|", HEADER TRUE, FORMAT CSV);"""
c.execute(copy_query)
# Parse required fields for "UPSERTING" and define criteria fields in an array.
criteria_fields = ['FOCNo','UCNo','LotNoAssigned','ReleaseNo']
field_names = ', '.join([f'"{field}" = excluded."{field}"' for field in headers])
value_fields = ' '.join([f'"{tblname}"."{field}" = "tmp_{tblname}"."{field}" AND' for field in criteria_fields])
conflict_fields = ', '.join([f'"{field}"' for field in criteria_fields])
update_query = f"""INSERT INTO "{tblname}" ({copy_fields})
SELECT {copy_fields}
FROM "tmp_{tblname}"
ON CONFLICT ({conflict_fields}) DO UPDATE
SET {field_names};
"""
# print(update_query)
c.execute(update_query)
# Drop temp table from database before commiting and exiting.
drop_query = f"""DROP TABLE "tmp_{tblname}";"""
c.execute(drop_query)
print(c.statusmessage, c.description)
connection.commit()
c.close()
connection.close()
print("d00ts!")
except(Exception, psycopg2.DatabaseError) as error:
print ("Error while connecting to PostgreSQL:", error)
else:
#Closes open database connections.
if(connection):
c.close()
connection.close()
print("PostgreSQL connection is closed")
def main(DATABASE):
# Predefined list of tables that records need inserted from.
# table_names = ["RecipeByUniqueCode","tblCustomerOrder", "tblCustomerOrderSUB", "tblBillOfMaterials", "Ingredients", "tblPurchaseOrder"]
table_names = ["tblCustomerOrderSUB"]
for table in table_names:
# Export data from Access to CSV files via MDB Tools external commands
csv_dir = '/home/path/to/csv/'
csv_file = '{0}.csv'.format(table)
os.system('''mdb-export -R "\n" -d "|" -D %{0} -q '"' {1} {2} > {3}'''.format('F',DATABASE,table,csv_dir+csv_file))
#Parse dates properly
dateparse = lambda x: pd.datetime.strptime(x, "%Y-%m-%d")
#Read the CSV file into a Pandas Dataframe with proper params
dataframe = pd.read_csv(csv_dir+csv_file,
delimiter="|",
lineterminator='\n',
quotechar='"',
low_memory=False,
memory_map=True,
parse_dates=True,
date_parser=dateparse,
keep_default_na=False
)
### You can control the number of records exported to Panda Dataframe with the '.head(#)' attribute! Very useful!
df = dataframe.sort_values(['FOCNo'], ascending=False).replace({'\'': ''}, regex=True)#.head(10)
now = str(datetime.now().isoformat())
df_csv_file = "{}{}_{}.csv".format(csv_dir,now,table)
df.to_csv(df_csv_file, sep="|", index=False,quotechar='"', na_rep='', line_terminator='\n')
pg_connect(df_csv_file, table, df.columns)
sql = pg_connect('CLOSE_CONNECTION', None, None)
if __name__ == "__main__":
main(DATABASE)
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment