Skip to content

Instantly share code, notes, and snippets.

@SamuelMarks
Created February 21, 2023 04:04
Show Gist options
  • Save SamuelMarks/fec744a620e2abd0257671aa6f2a96b4 to your computer and use it in GitHub Desktop.
Save SamuelMarks/fec744a620e2abd0257671aa6f2a96b4 to your computer and use it in GitHub Desktop.
from io import StringIO
from json import dumps
from itertools import repeat
from collections import namedtuple
from pprint import PrettyPrinter
import psycopg2
import psycopg2.sql
import numpy as np
pp = PrettyPrinter(indent=4).pprint
def psql_insert_copy(table, conn, keys, data_iter):
with conn.cursor() as cur:
s_buf = StringIO()
s_buf.write(
"\n".join(
map(lambda line: "|".join(map(str, map(parse_col, line))), data_iter)
)
)
s_buf.seek(0)
sql = "COPY {} ({}) FROM STDIN WITH null as 'null' DELIMITER '|'".format(
psycopg2.sql.Identifier(
*(table.schema, table.name) if table.schema else (table.name,)
).as_string(cur),
psycopg2.sql.SQL(", ")
.join(
map(
psycopg2.sql.Identifier,
keys[1:] if keys and keys[0] == "index" else keys,
)
)
.as_string(cur),
)
try:
cur.copy_expert(sql=sql, file=s_buf)
except:
print(sql)
s_buf.seek(0)
pp(
dict(
zip(
keys[1:] if keys and keys[0] == "index" else keys,
next(s_buf).split("|"),
)
)
)
raise
def parse_col(col):
if isinstance(col, np.ndarray):
return parse_col(col.tolist()) if col.size > 0 else "null"
elif isinstance(col, bool):
return int(col)
elif isinstance(col, bytes):
try:
return parse_col(col.decode("utf8"))
except UnicodeError:
print("unable to decode: {!r} ;".format(col))
raise
elif isinstance(col, (complex, int)):
return col
elif isinstance(col, float):
return int(col) if col.is_integer() else col
elif col in (None, "{}", "[]") or not col:
return "null"
elif isinstance(col, str):
return {"True": 1, "False": 0}.get(col, col)
elif isinstance(col, (list, tuple, set, frozenset)):
return "{{{0}{1}}}".format(
",".join(map(partial(dumps, separators=(",", ":")), map(parse_col, col))),
"," if len(col) == 1 else "",
)
elif isinstance(col, dict):
return dumps(col, separators=(",", ":"))
elif isinstance(col, datetime):
return col.isoformat()
else:
raise NotImplementedError(type(col))
from os import environ
conn = psycopg2.connect(
environ["RDBMS_URI"]
# "dbname=test_user_db user=test_user"
)
conn.cursor().execute(
"CREATE TABLE my_table ("
" json_arr_col json[],"
" id integer generated by default as identity primary key"
");"
)
psql_insert_copy(
conn=conn,
keys=("json_arr_col", "id"),
data_iter=repeat(({"jj": None, "text": "bop"},), 5),
table=namedtuple("_", ("name", "schema"))("my_table", None),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment