Skip to content

Instantly share code, notes, and snippets.

@mitya57
Last active November 19, 2016 18:54
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save mitya57/1cb49c876d094ca8be650187a3d1fdd4 to your computer and use it in GitHub Desktop.
My Oracle Shell
# Alternative shell for Oracle database.
# Alpha version, do not use for production purposes.
#
# Author: 2016 Dmitry Shachnev <ich@mitya57.me>
import readline
import shutil
import sys
import cx_Oracle
def resize(string, length, fill_character=' '):
if length < len(string):
return string[:length]
return string + fill_character * (length - len(string))
def format_headers(headers, lines):
columns = shutil.get_terminal_size().columns
header_names = list(zip(*headers))[0]
lines_stringified = [list(map(str, line)) for line in lines]
max_lengths = [max(map(len, column)) for column in zip(*[header_names] + lines_stringified)]
max_possible_sum = columns - 3 * (len(max_lengths) - 1)
lengths_sum = sum(max_lengths)
if lengths_sum > max_possible_sum:
max_lengths = [length * max_possible_sum // lengths_sum for length in max_lengths]
while sum(max_lengths) < max_possible_sum:
max_diff = 0
index = None
for i, column in enumerate(zip(*lines_stringified)):
diff = max(map(len, column)) - max_lengths[i]
if diff > max_diff:
max_diff, index = diff, i
if index is not None:
max_lengths[index] += 1
new_headers = (resize(*pair) for pair in zip(header_names, max_lengths))
print(' | '.join(new_headers))
print('-+-'.join('-' * length for length in max_lengths))
for line in lines_stringified:
new_items = (resize(*pair) for pair in zip(line, max_lengths))
print(' | '.join(new_items))
def execute_query(query, query_args, cursor):
cursor.execute(query, query_args)
format_headers(cursor.description, cursor)
def process_query(line, cursor):
query = line
query_args = {}
if line.lower().startswith('desc '):
query = "select COLUMN_NAME, DATA_TYPE, NULLABLE from COLS where TABLE_NAME = :table_name"
query_args["table_name"] = line[5:].upper()
try:
execute_query(query, query_args, cursor)
except cx_Oracle.DatabaseError as exc:
error, = exc.args
print(error.message, file=sys.stderr)
def main(argv):
if len(argv) < 2:
sys.exit("usage: %s <logon string>" % sys.argv[0])
try:
connection = cx_Oracle.Connection(argv[1])
except cx_Oracle.DatabaseError as exc:
error, = exc.args
sys.exit(error.message)
cursor = connection.cursor()
while True:
try:
line = input('SQL> ')
except EOFError:
print()
break
process_query(line, cursor)
if __name__ == "__main__":
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment