Last active
November 19, 2016 18:54
Star
You must be signed in to star a gist
My Oracle Shell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Alternative shell for Oracle database. | |
# Alpha version, do not use for production purposes. | |
# | |
# Author: 2016 Dmitry Shachnev <ich@mitya57.me> | |
import readline | |
import shutil | |
import sys | |
import cx_Oracle | |
def resize(string, length, fill_character=' '): | |
if length < len(string): | |
return string[:length] | |
return string + fill_character * (length - len(string)) | |
def format_headers(headers, lines): | |
columns = shutil.get_terminal_size().columns | |
header_names = list(zip(*headers))[0] | |
lines_stringified = [list(map(str, line)) for line in lines] | |
max_lengths = [max(map(len, column)) for column in zip(*[header_names] + lines_stringified)] | |
max_possible_sum = columns - 3 * (len(max_lengths) - 1) | |
lengths_sum = sum(max_lengths) | |
if lengths_sum > max_possible_sum: | |
max_lengths = [length * max_possible_sum // lengths_sum for length in max_lengths] | |
while sum(max_lengths) < max_possible_sum: | |
max_diff = 0 | |
index = None | |
for i, column in enumerate(zip(*lines_stringified)): | |
diff = max(map(len, column)) - max_lengths[i] | |
if diff > max_diff: | |
max_diff, index = diff, i | |
if index is not None: | |
max_lengths[index] += 1 | |
new_headers = (resize(*pair) for pair in zip(header_names, max_lengths)) | |
print(' | '.join(new_headers)) | |
print('-+-'.join('-' * length for length in max_lengths)) | |
for line in lines_stringified: | |
new_items = (resize(*pair) for pair in zip(line, max_lengths)) | |
print(' | '.join(new_items)) | |
def execute_query(query, query_args, cursor): | |
cursor.execute(query, query_args) | |
format_headers(cursor.description, cursor) | |
def process_query(line, cursor): | |
query = line | |
query_args = {} | |
if line.lower().startswith('desc '): | |
query = "select COLUMN_NAME, DATA_TYPE, NULLABLE from COLS where TABLE_NAME = :table_name" | |
query_args["table_name"] = line[5:].upper() | |
try: | |
execute_query(query, query_args, cursor) | |
except cx_Oracle.DatabaseError as exc: | |
error, = exc.args | |
print(error.message, file=sys.stderr) | |
def main(argv): | |
if len(argv) < 2: | |
sys.exit("usage: %s <logon string>" % sys.argv[0]) | |
try: | |
connection = cx_Oracle.Connection(argv[1]) | |
except cx_Oracle.DatabaseError as exc: | |
error, = exc.args | |
sys.exit(error.message) | |
cursor = connection.cursor() | |
while True: | |
try: | |
line = input('SQL> ') | |
except EOFError: | |
print() | |
break | |
process_query(line, cursor) | |
if __name__ == "__main__": | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment