Last active
April 24, 2024 14:18
-
-
Save b40yd/238b4a3e471070d8a93a216a775f37b1 to your computer and use it in GitHub Desktop.
sqlclient.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
from __future__ import absolute_import, division, print_function, unicode_literals | |
import argparse | |
import json | |
import logging | |
import os | |
import sys | |
from datetime import datetime | |
import psycopg2 | |
from prettytable import ORGMODE, PrettyTable | |
from psycopg2.extras import RealDictCursor | |
__version__ = "0.0.1" | |
def print_query_results(args, results): | |
table = PrettyTable( | |
encoding=sys.stdout.encoding, | |
padding_width=1, | |
set_style=ORGMODE, | |
junction_char="|", | |
) | |
data = "" | |
if results and args.align: | |
table.field_names = results[0].keys() | |
for row in results: | |
table.add_row(row.values()) | |
data = table.get_string() | |
elif results: | |
separator = "{}".format(args.field_separator) | |
field_names = separator.join(results[0].keys()) | |
data = "{}\n".format(field_names) | |
for row in results: | |
values = [] | |
for value in row.values(): | |
if type(value) == datetime: | |
values.append(value.strftime("%Y-%m-%d %H:%M:%S")) | |
elif type(value) == bool: | |
values.append(str(value)) | |
elif type(value) in (list, tuple, dict): | |
values.append(json.dumps(value)) | |
else: | |
values.append(value) | |
data += "{}\n".format(separator.join(values)) | |
if data and args.output: | |
with open(args.output, "w", encoding="utf-8") as f: | |
f.write(data) | |
else: | |
print(data) | |
def parser_config(args): | |
if args.debug: | |
logging.basicConfig( | |
format="[%(asctime)s - %(name)s - %(module)s - %(funcName)s] [%(levelname)s] %(message)s", | |
level=logging.DEBUG, | |
stream=sys.stdout, | |
) | |
else: | |
logging.basicConfig( | |
format="[%(asctime)s - %(name)s - %(module)s - %(funcName)s] [%(levelname)s] %(message)s", | |
level=logging.INFO, | |
stream=sys.stdout, | |
) | |
if args.nopassword: | |
password = args.password | |
else: | |
password = os.getenv("PGPASSWORD") | |
params = { | |
"dbname": args.dbname, | |
"user": args.username, | |
"password": password, | |
"host": args.host, | |
"port": args.port, | |
} | |
return params | |
def pg_query(args): | |
sql_query = args.command | |
db_params = parser_config(args) | |
logging.debug(" %s ", db_params) | |
if args.filename: | |
with open(args.filename, "r") as f: | |
sql_query = f.read() | |
try: | |
with psycopg2.connect(**db_params) as conn: | |
with conn.cursor(cursor_factory=RealDictCursor) as cur: | |
cur.execute(sql_query) | |
rows = cur.fetchall() | |
print_query_results(args, rows) | |
except Exception as e: | |
raise SystemError(e) | |
def console(): | |
parser = argparse.ArgumentParser( | |
description="backup/restore PostgreSQL databases", add_help=False | |
) | |
parser.add_argument( | |
"--version", action="version", version="%(prog)s {}".format(__version__) | |
) | |
parent_parser = argparse.ArgumentParser(add_help=False) | |
parent_parser.add_argument( | |
"-h", | |
"--host", | |
"--hostname", | |
dest="host", | |
default="localhost", | |
help="database server host or socket directory", | |
) | |
parent_parser.add_argument( | |
"--debug", dest="debug", action="store_true", help="Turn on debugging output" | |
) | |
subparsers = parser.add_subparsers(dest="command", help="a sub command") | |
subparsers.required = True # https://bugs.python.org/issue9253#msg186387 | |
pg_parser = subparsers.add_parser( | |
"postgresql", | |
parents=[parent_parser], | |
help="PostgreSQL database", | |
add_help=False, | |
) | |
pg_parser.add_argument( | |
"-d", "--dbname", dest="dbname", help="database name to connect to" | |
) | |
pg_parser.add_argument( | |
"-U", "--username", "--user", dest="username", help="database user name" | |
) | |
pg_parser.add_argument( | |
"-W", "--password", dest="password", help="database password" | |
) | |
pg_parser.add_argument( | |
"-A", | |
"--no-align", | |
dest="align", | |
action="store_true", | |
help="unaligned table output mode", | |
) | |
pg_parser.add_argument( | |
"-P", | |
dest="pset", | |
help="set printing option VAR to ARG (see \pset command)", | |
) | |
pg_parser.add_argument( | |
"-f", | |
"--file", | |
dest="filename", | |
help="execute commands from file, then exit", | |
) | |
pg_parser.add_argument( | |
"-o", | |
"--output", | |
dest="output", | |
help="send query results to file (or |pipe)", | |
) | |
pg_parser.add_argument( | |
"-v", | |
"--set", | |
"--variable", | |
dest="variable", | |
help="set psql variable NAME to VALUE(e.g., -v ON_ERROR_STOP=1)", | |
) | |
pg_parser.add_argument( | |
"-F", | |
"--field-separator", | |
dest="field_separator", | |
default="|", | |
help="field separator for unaligned output (default: ' | ')", | |
) | |
pg_parser.add_argument( | |
"-w", | |
"--no-password", | |
dest="nopassword", | |
action="store_true", | |
help="no password", | |
) | |
pg_parser.add_argument( | |
"-p", "--port", type=int, default=5432, dest="port", help="database server post" | |
) | |
pg_parser.add_argument( | |
"--help", | |
action="help", | |
default=argparse.SUPPRESS, | |
help="show this help message and exit", | |
) | |
pg_parser.add_argument( | |
"-c", | |
"--command", | |
type=str, | |
dest="command", | |
help="run only single command (SQL or internal) and exit", | |
) | |
pg_parser.set_defaults(func=pg_query) | |
args = parser.parse_args() | |
return args.func(args) | |
if __name__ == "__main__": | |
console() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment