Last active
February 7, 2020 00:58
-
-
Save D2theR/63c0c6906203a36e67bc95e51658714f to your computer and use it in GitHub Desktop.
Dump Access database to PostgreSQL.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# A simple script to dump the contents of | |
# Microsoft Access Database tables to CSV & then to a PostgreSQL Database | |
# It depends upon the mdbtools suite: | |
# http://sourceforge.net/projects/mdbtools/ | |
# Can be installed in most Debian based Linux distros with: | |
# | |
# sudo apt install mdbtools | |
# | |
# Script use is like: | |
# python CSV_ACCESS_DUMP.py /path/to/MyAccessDB.accdb | |
""" | |
1. Export user-defined list of "tables" to CSV using Mdb-tools external Python command with an .accdb file as argument. | |
2. Open file in Pandas and parse Nulls, Dates, Omit Columns and resave as new CSV (timestamped file) | |
3. Parse SQL statements: | |
a. SET buffers to increase effiency from large tables and create and index | |
b. Create a temporary table from the table that data is being "COPY" commanded from. | |
c. Create an Index on the temp table and analysis is to speed on inserts | |
d. COPY data from the CSV file to the temp(tmp) table | |
e. Issue an "UPSERT" command or in Postgres' case it will try to INSERT, SELECTed data FROM temp table, | |
and ON CONFLICT of (user defined constraint fields) it will UPDATE the fields instead. | |
-- PostgresSQL info for this script -- | |
NOTE: All the following database transactions have to be completed in ONE cursor session or they will fail! | |
### RECOMMENDATIONS ### | |
For Big tables If the import-table is big it may pay to increase temp_buffers temporarily for the session (first thing in the session): | |
SET temp_buffers = '500MB'; -- example value | |
Add an index to the temporary table: | |
CREATE INDEX tmp_x_id_idx ON tmp_x(id); | |
Run ANALYZE manually, since temporary tables are not covered by autovacuum / auto-analyze. | |
ANALYZE tmp_x; | |
If the imported table matches the table to be updated exactly, this may be convenient as it | |
creates an empty temporary table matching the structure of the existing table, without constraints. | |
CREATE TEMP TABLE tmp_tblCustomerOrderSUB AS SELECT * FROM tblCustomerOrderSUB LIMIT 0; | |
COPY tmp_tblCustomerOrderSUB FROM '/absolute/path/to/file' (DELIMITER "|", HEADER TRUE, FORMAT CSV); | |
INSERT INTO "tblname" (fields) | |
SELECT {fields} | |
FROM "tmp_tblname" | |
ON CONFLICT (conflict_fields) DO UPDATE | |
SET field_name = excluded.field_name; | |
DROP TABLE tmp_tblCustomerOrderSUB; -- else it is dropped at end of session automatically | |
""" | |
import sys, subprocess, os, re | |
import psycopg2 | |
import pandas as pd | |
from datetime import datetime | |
# Database file specified from command line argument | |
DATABASE = sys.argv[1] | |
def pg_connect(csv_file, tblname, headers): | |
connection = psycopg2.connect(user="postgres", | |
password="postgres", | |
host="127.0.0.1", | |
port="5432", | |
database="postgres") | |
c = connection.cursor() | |
if csv_file != "CLOSE_CONNECTION": | |
try: | |
print(tblname) | |
# Increase buffer size to speed up transactions | |
buffer_query = """SET temp_buffers = '50MB';""" | |
c.execute(buffer_query) | |
# Create temporary table | |
temp_tbl_query = f"""CREATE TEMP TABLE "tmp_{tblname}" AS SELECT * FROM "{tblname}" LIMIT 0;""" | |
c.execute(temp_tbl_query) | |
# Create an index | |
index_query = f"""CREATE INDEX "tmp_{tblname}_id_idx" ON "tmp_{tblname}"(id);""" | |
c.execute(index_query) | |
# Analyze the table to speed up inserts | |
analyze_query = f"""ANALYZE "tmp_{tblname}";""" | |
# Parse fields from Panda's array and copy data from the CSV file to the temp table | |
copy_fields = ','.join([f'"{col}"' for col in headers]) | |
copy_query = f"""COPY "tmp_{tblname}" ({copy_fields}) FROM '{csv_file}' (DELIMITER "|", HEADER TRUE, FORMAT CSV);""" | |
c.execute(copy_query) | |
# Parse required fields for "UPSERTING" and define criteria fields in an array. | |
criteria_fields = ['FOCNo','UCNo','LotNoAssigned','ReleaseNo'] | |
field_names = ', '.join([f'"{field}" = excluded."{field}"' for field in headers]) | |
value_fields = ' '.join([f'"{tblname}"."{field}" = "tmp_{tblname}"."{field}" AND' for field in criteria_fields]) | |
conflict_fields = ', '.join([f'"{field}"' for field in criteria_fields]) | |
update_query = f"""INSERT INTO "{tblname}" ({copy_fields}) | |
SELECT {copy_fields} | |
FROM "tmp_{tblname}" | |
ON CONFLICT ({conflict_fields}) DO UPDATE | |
SET {field_names}; | |
""" | |
# print(update_query) | |
c.execute(update_query) | |
# Drop temp table from database before commiting and exiting. | |
drop_query = f"""DROP TABLE "tmp_{tblname}";""" | |
c.execute(drop_query) | |
print(c.statusmessage, c.description) | |
connection.commit() | |
c.close() | |
connection.close() | |
print("d00ts!") | |
except(Exception, psycopg2.DatabaseError) as error: | |
print ("Error while connecting to PostgreSQL:", error) | |
else: | |
#Closes open database connections. | |
if(connection): | |
c.close() | |
connection.close() | |
print("PostgreSQL connection is closed") | |
def main(DATABASE): | |
# Predefined list of tables that records need inserted from. | |
# table_names = ["RecipeByUniqueCode","tblCustomerOrder", "tblCustomerOrderSUB", "tblBillOfMaterials", "Ingredients", "tblPurchaseOrder"] | |
table_names = ["tblCustomerOrderSUB"] | |
for table in table_names: | |
# Export data from Access to CSV files via MDB Tools external commands | |
csv_dir = '/home/path/to/csv/' | |
csv_file = '{0}.csv'.format(table) | |
os.system('''mdb-export -R "\n" -d "|" -D %{0} -q '"' {1} {2} > {3}'''.format('F',DATABASE,table,csv_dir+csv_file)) | |
#Parse dates properly | |
dateparse = lambda x: pd.datetime.strptime(x, "%Y-%m-%d") | |
#Read the CSV file into a Pandas Dataframe with proper params | |
dataframe = pd.read_csv(csv_dir+csv_file, | |
delimiter="|", | |
lineterminator='\n', | |
quotechar='"', | |
low_memory=False, | |
memory_map=True, | |
parse_dates=True, | |
date_parser=dateparse, | |
keep_default_na=False | |
) | |
### You can control the number of records exported to Panda Dataframe with the '.head(#)' attribute! Very useful! | |
df = dataframe.sort_values(['FOCNo'], ascending=False).replace({'\'': ''}, regex=True)#.head(10) | |
now = str(datetime.now().isoformat()) | |
df_csv_file = "{}{}_{}.csv".format(csv_dir,now,table) | |
df.to_csv(df_csv_file, sep="|", index=False,quotechar='"', na_rep='', line_terminator='\n') | |
pg_connect(df_csv_file, table, df.columns) | |
sql = pg_connect('CLOSE_CONNECTION', None, None) | |
if __name__ == "__main__": | |
main(DATABASE) | |
exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment