Skip to content

Instantly share code, notes, and snippets.

@linuxluigi
Last active April 21, 2020 16:12
Show Gist options
  • Save linuxluigi/8c0b8594815b80aa856b2d407e570777 to your computer and use it in GitHub Desktop.
Save linuxluigi/8c0b8594815b80aa856b2d407e570777 to your computer and use it in GitHub Desktop.
import psycopg2
from psycopg2.extensions import connection, cursor
import sys
import datetime
class DestDB:
dbname = "gis"
user = "docker"
host = "localhost"
password = "Dyx8lXMKIGggiQXTzSrAuZ3UsDt8YmLy53WEIAga6EkkVc2GK9lmiRfJxzx7Oahw"
class SrcDB:
dbname = "ohdm"
user = "user"
host = "localhost"
password = "pass"
class CopyOhdmDB:
tables = {
"classification": ["id", "class", "subclassname"],
# "content": ["id", "name", "value", "mimetype", "source_user_id"],
# "external_systems": ["id", "name", "description"],
# "external_users": ["id", "userid", "username", "external_system_id"],
"geoobject": ["id", "name", "source_user_id"],
# "geoobject_content": ["id", "valid_since", "valid_until", "valid_since_offset", "valid_until_offset", "geoobject_id", "content_id"],
"geoobject_geometry": ["id", "id_target", "type_target", "id_geoobject_source", "role", "classification_id", "tags", "valid_since", "valid_until", "valid_since_offset", "valid_until_offset", "source_user_id"],
# "geoobject_url": ["id", "geoobject_id", "url_id", "valid_since", "valid_until", "valid_since_offset", "valid_until_offset"],
# "import_updates": ["id", "externalsystemid", "initial", "lastupdate"],
"lines": ["id", "line", "source_user_id"],
"points": ["id", "point", "source_user_id"],
"polygons": ["id", "polygon", "source_user_id"],
# "subsequent_geom_user": ["id", "target_id", "point_id", "line_id", "polygon_id"],
# "url": ["id", "url", "source_user_id"]
}
def __init__(self):
print("Connect to database...")
try:
self.con_source: connection = psycopg2.connect("dbname='{0}' user='{1}' host='{2}' password='{3}'".format(
SrcDB.dbname,
SrcDB.user,
SrcDB.host,
SrcDB.password,
))
self.con_dest: connection = psycopg2.connect("dbname='{0}' user='{1}' host='{2}' password='{3}'".format(
DestDB.dbname,
DestDB.user,
DestDB.host,
DestDB.password,
))
except psycopg2.OperationalError:
print ("I am unable to connect to the database")
exit(1)
self.cur_source: cursor = self.con_source.cursor()
self.cur_dest: cursor = self.con_dest.cursor()
self.schema_source: str = "ohdm"
self.schema_dest: str = "public"
self.cache_size: int = 100000
def count_rows(self, schema: str, table: str, cur: cursor) -> int:
print("Count rows for {}".format(table))
cur.execute("SELECT COUNT(*) FROM {0}.{1};".format(schema, table))
rows = cur.fetchall()
for row in rows:
return int(row[0])
return 0
def column_2_str(self, column: List[str]) -> str:
return "{} ".format(", ".join(column))
def row_2_insert(self, row) -> str:
value_query: str = ""
for value in row:
if isinstance(value, int):
value_query += "{}, ".format(value)
else:
value_query += "'{}', ".format(value)
return "{} ".format(value_query[:-2])
def copy_table(self, table: str, offset_value: Optional[int], target_rows_amount: int) -> Optional[int]:
insert_query: str = ""
if not offset_value:
offset_value: int = self.count_rows(schema=self.schema_dest, table=table, cur=self.cur_dest)
if offset_value / 1000000 > 0:
offset_str: str = "{0:.2f}m".format(offset_value / 1000000)
else:
offset_str: str = "{0:.2f}k".format(offset_value / 1000)
if target_rows_amount / 1000000 > 0:
target_rows: str = "{0:.2f}m".format(target_rows_amount / 1000000)
else:
target_rows: str = "{0:.2f}k".format(target_rows_amount / 1000)
done_percent: int = offset_value / target_rows_amount *100
print("Import {0} from {1} of {2} ({3:.2f}%) @ {4}".format(table, offset_str, target_rows, done_percent, datetime.datetime.now()))
self.cur_source.execute("SELECT * FROM {0}.{1} ORDER BY id ASC LIMIT {2} OFFSET {3};".format(
self.schema_source,
table,
self.cache_size,
offset_value
))
rows = self.cur_source.fetchall()
for row in rows:
insert_query += "INSERT INTO {2}.{3}({0}) VALUES ({1}); \n".format(
self.column_2_str(CopyOhdmDB.tables[table]),
self.row_2_insert(row=row),
self.schema_dest,
table
)
# check if done
if insert_query == "":
None
self.cur_dest.execute(insert_query)
self.con_dest.commit()
return offset_value + self.cache_size
def copy_all(self):
for table in CopyOhdmDB.tables:
self.copy_table_loop(table=table)
def copy_table_loop(self, table: str):
offset_value: Optional[int] = None
print("start copy {}".format(table))
target_rows_amount: int = self.count_rows(schema=self.schema_source, table=table, cur=self.cur_source)
while True:
offset_value = self.copy_table(table=table, offset_value=offset_value, target_rows_amount=target_rows_amount)
if not offset_value:
break
# start copy
copy_ohdm_db: CopyOhdmDB = CopyOhdmDB()
if len(sys.argv) > 1:
while True:
try:
copy_ohdm_db.copy_table_loop(table=str(sys.argv[1]))
except psycopg2.errors.InFailedSqlTransaction:
copy_ohdm_db.con_dest.commit()
except (psycopg2.errors.UniqueViolation, psycopg2.errors.InFailedSqlTransaction) as e:
print(e)
except psycopg2.DatabaseError:
print("Database connection error!")
exit(1)
else:
while True:
try:
copy_ohdm_db.copy_all()
except psycopg2.errors.InFailedSqlTransaction:
copy_ohdm_db.con_dest.commit()
except (psycopg2.errors.UniqueViolation, psycopg2.errors.InFailedSqlTransaction) as e:
print(e)
except psycopg2.DatabaseError:
print("Database connection error!")
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment