Skip to content

Instantly share code, notes, and snippets.

@b40yd
Last active April 24, 2024 14:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save b40yd/238b4a3e471070d8a93a216a775f37b1 to your computer and use it in GitHub Desktop.
Save b40yd/238b4a3e471070d8a93a216a775f37b1 to your computer and use it in GitHub Desktop.
sqlclient.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import argparse
import json
import logging
import os
import sys
from datetime import datetime
import psycopg2
from prettytable import ORGMODE, PrettyTable
from psycopg2.extras import RealDictCursor
__version__ = "0.0.1"
def print_query_results(args, results):
table = PrettyTable(
encoding=sys.stdout.encoding,
padding_width=1,
set_style=ORGMODE,
junction_char="|",
)
data = ""
if results and args.align:
table.field_names = results[0].keys()
for row in results:
table.add_row(row.values())
data = table.get_string()
elif results:
separator = "{}".format(args.field_separator)
field_names = separator.join(results[0].keys())
data = "{}\n".format(field_names)
for row in results:
values = []
for value in row.values():
if type(value) == datetime:
values.append(value.strftime("%Y-%m-%d %H:%M:%S"))
elif type(value) == bool:
values.append(str(value))
elif type(value) in (list, tuple, dict):
values.append(json.dumps(value))
else:
values.append(value)
data += "{}\n".format(separator.join(values))
if data and args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(data)
else:
print(data)
def parser_config(args):
if args.debug:
logging.basicConfig(
format="[%(asctime)s - %(name)s - %(module)s - %(funcName)s] [%(levelname)s] %(message)s",
level=logging.DEBUG,
stream=sys.stdout,
)
else:
logging.basicConfig(
format="[%(asctime)s - %(name)s - %(module)s - %(funcName)s] [%(levelname)s] %(message)s",
level=logging.INFO,
stream=sys.stdout,
)
if args.nopassword:
password = args.password
else:
password = os.getenv("PGPASSWORD")
params = {
"dbname": args.dbname,
"user": args.username,
"password": password,
"host": args.host,
"port": args.port,
}
return params
def pg_query(args):
sql_query = args.command
db_params = parser_config(args)
logging.debug(" %s ", db_params)
if args.filename:
with open(args.filename, "r") as f:
sql_query = f.read()
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(sql_query)
rows = cur.fetchall()
print_query_results(args, rows)
except Exception as e:
raise SystemError(e)
def console():
parser = argparse.ArgumentParser(
description="backup/restore PostgreSQL databases", add_help=False
)
parser.add_argument(
"--version", action="version", version="%(prog)s {}".format(__version__)
)
parent_parser = argparse.ArgumentParser(add_help=False)
parent_parser.add_argument(
"-h",
"--host",
"--hostname",
dest="host",
default="localhost",
help="database server host or socket directory",
)
parent_parser.add_argument(
"--debug", dest="debug", action="store_true", help="Turn on debugging output"
)
subparsers = parser.add_subparsers(dest="command", help="a sub command")
subparsers.required = True # https://bugs.python.org/issue9253#msg186387
pg_parser = subparsers.add_parser(
"postgresql",
parents=[parent_parser],
help="PostgreSQL database",
add_help=False,
)
pg_parser.add_argument(
"-d", "--dbname", dest="dbname", help="database name to connect to"
)
pg_parser.add_argument(
"-U", "--username", "--user", dest="username", help="database user name"
)
pg_parser.add_argument(
"-W", "--password", dest="password", help="database password"
)
pg_parser.add_argument(
"-A",
"--no-align",
dest="align",
action="store_true",
help="unaligned table output mode",
)
pg_parser.add_argument(
"-P",
dest="pset",
help="set printing option VAR to ARG (see \pset command)",
)
pg_parser.add_argument(
"-f",
"--file",
dest="filename",
help="execute commands from file, then exit",
)
pg_parser.add_argument(
"-o",
"--output",
dest="output",
help="send query results to file (or |pipe)",
)
pg_parser.add_argument(
"-v",
"--set",
"--variable",
dest="variable",
help="set psql variable NAME to VALUE(e.g., -v ON_ERROR_STOP=1)",
)
pg_parser.add_argument(
"-F",
"--field-separator",
dest="field_separator",
default="|",
help="field separator for unaligned output (default: ' | ')",
)
pg_parser.add_argument(
"-w",
"--no-password",
dest="nopassword",
action="store_true",
help="no password",
)
pg_parser.add_argument(
"-p", "--port", type=int, default=5432, dest="port", help="database server post"
)
pg_parser.add_argument(
"--help",
action="help",
default=argparse.SUPPRESS,
help="show this help message and exit",
)
pg_parser.add_argument(
"-c",
"--command",
type=str,
dest="command",
help="run only single command (SQL or internal) and exit",
)
pg_parser.set_defaults(func=pg_query)
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
console()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment