Last active
October 25, 2019 18:23
-
-
Save andy-slac/7e1e473dd49ced02dd844fda7c835e8e to your computer and use it in GitHub Desktop.
Script to test how to retrieve ATOINCREMENT values
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
from sqlalchemy import create_engine, event, MetaData, Table, Column, Sequence, Integer, String | |
shortcuts = {"oracle": "oracle+cx_oracle://@gen3_cred_salnikov", | |
"sqlite": "sqlite:///test.db", | |
"postgres": "postgresql://"} | |
def event_setinputsizes(inputsizes, cursor, statement, parameters, context): | |
ret_0_par = None | |
for param in inputsizes: | |
if param.key == "ret_0": | |
ret_0_par = param | |
if ret_0_par is not None: | |
inputsizes[ret_0_par] = cursor.var(int, arraysize=100) | |
for param, type in inputsizes.items(): | |
print(f"param: {repr(param)} {type} @{id(type)}") | |
bindvars = cursor.bindvars | |
print("event_setinputsizes cursor.bindvars = ", bindvars) | |
def _dump_cursor_bindvars(cursor): | |
print(f" cursor = {cursor} @{id(cursor)}") | |
if cursor: | |
bindvars = cursor.bindvars | |
print(" cursor.bindvars = ", bindvars) | |
if bindvars: | |
var = bindvars.get('ret_0') | |
print(f" values = {var} @{id(var)}") | |
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): | |
print("after_cursor_execute:") | |
_dump_cursor_bindvars(cursor) | |
def main(): | |
parser = argparse.ArgumentParser(description="Testing sqlalchemy RETURNING support") | |
parser.add_argument("backend", default="oracle", | |
help="SQLAlchemy connection URL, or one of oracle|sqlite|postgres") | |
parser.add_argument("-n", dest="count", default=1, type=int, | |
help="Number of rows to insert.") | |
parser.add_argument("-r", dest="returning", default=False, action="store_true", | |
help="Use RETURNING clause.") | |
parser.add_argument("--pg-mode", default=None, choices=["batch", "values"], | |
help="Postgres executemany mode.") | |
parser.add_argument("--pg-page", default=None, type=int, | |
help="Postgres executemany page size.") | |
args = parser.parse_args() | |
kw = dict(executemany_mode=args.pg_mode, | |
executemany_batch_page_size=args.pg_page, | |
executemany_values_page_size=args.pg_page) | |
engine = create_engine(shortcuts.get(args.backend, args.backend), echo=1, **kw) | |
if engine.dialect.name == "oracle": | |
event.listen(engine, "do_setinputsizes", event_setinputsizes) | |
event.listen(engine, "after_cursor_execute", after_cursor_execute) | |
conn = engine.connect() | |
metadata = MetaData() | |
table = Table("test_table_ret", metadata, | |
Column("id", Integer, Sequence('test_table_ret_id_seq', metadata=metadata), primary_key=True), | |
Column("val1", Integer), | |
Column("val2", Integer)) | |
metadata.create_all(engine) | |
data = [{'val1': i+1, 'val2': (i+1)*10} for i in range(args.count)] | |
if len(data) == 1: data = data[0] | |
query = table.insert() #.returning(table.c.id) | |
if args.returning: | |
query = query.returning(table.c.id) | |
with conn.begin(): | |
res = conn.execute(query, data) | |
if conn.dialect.name == "oracle": | |
print("after conn.execute:") | |
_dump_cursor_bindvars(res.cursor) | |
print("rowcount = ", getattr(res, "rowcount", "N/A")) | |
try: | |
inserted_primary_key = res.inserted_primary_key | |
except: | |
inserted_primary_key = "N/A" | |
print("inserted_primary_key = ", inserted_primary_key) | |
try: | |
for row in res.fetchall(): | |
print("row = ", row) | |
except: | |
pass | |
if conn.dialect.name != "oracle": | |
print("lastrowid = ", res.lastrowid) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment