Skip to content

Instantly share code, notes, and snippets.

@openinx
Created July 18, 2013 01:43
Show Gist options
  • Save openinx/6026089 to your computer and use it in GitHub Desktop.
Save openinx/6026089 to your computer and use it in GitHub Desktop.
A MySQL Client Ternimal implemented by pure PYTHON.
import time
import pymysql
import prettytable
import itertools
import traceback
import ConfigParser
from nepo.common import utils
import nepo.web.constant as web_const
try:
import readline
readline.parse_and_bind('tab: complete')
readline.parse_and_bind('set editing-mode emacs')
except:
print 'Cannot ``import readline`` package.'
cfg_parser = None
def get_cfg(section, key):
fn = utils.fetch_cfgpath('nepo.conf', locpath=__file__, count=2)
global cfg_parser
if not cfg_parser:
cfg_parser = ConfigParser.ConfigParser()
cfg_parser.read(fn)
return cfg_parser.get(section, key)
class cfg:
mysql_host = get_cfg('mysql', 'host')
mysql_port = int(get_cfg('mysql', 'port'))
mysql_user = get_cfg('mysql', 'user')
mysql_password = get_cfg('mysql', 'password')
mysql_database = get_cfg('mysql', 'db_name')
webdb_user = web_const.DBCONFIG.CON_DB_USER
webdb_password = web_const.DBCONFIG.CON_DB_PASSWD
webdb_host = web_const.DBCONFIG.CON_DB_HOST
webdb_port = web_const.DBCONFIG.CON_DB_PORT
webdb_database = web_const.DBCONFIG.CON_DB
databases = {
cfg.mysql_database: {
'host': cfg.mysql_host,
'port': cfg.mysql_port,
'user': cfg.mysql_user,
'passwd': cfg.mysql_password,
'charset': 'utf8',
},
cfg.webdb_database: {
'host': cfg.webdb_host,
'port': cfg.webdb_port,
'user': cfg.webdb_user,
'passwd': cfg.webdb_password,
'charset': 'utf8',
},
}
CURRENT_DATABASE = databases.keys()[0]
class sql_type:
_map = {
'query': ['select', 'desc', 'show'],
'insert': ['insert'],
'update': ['update'],
'delete': ['delete'],
'change_db': ['use'],
}
QUERY = 'query'
INSERT = 'insert'
UPDATE = 'update'
DELETE = 'delete'
CHANGE_DB = 'change_db'
UNKNOWN = 'unknown'
@staticmethod
def decide(sql):
head = sql.partition(' ')[0]
for typ in sql_type._map.keys():
if head and head.lower() in sql_type._map[typ]:
return typ
return sql_type.UNKNOWN
def connect_db():
args = dict(db=CURRENT_DATABASE)
kw = databases[CURRENT_DATABASE]
args.update(kw)
conn = pymysql.Connect(**args)
return conn
def print_list(objs, fields, sortby_index=None):
if sortby_index == None:
sortby = None
else:
sortby = fields[sortby_index]
pt = prettytable.PrettyTable([f for f in fields], caching=False)
pt.align = 'l'
for o in objs:
row = []
for field in fields:
data = getattr(o, field, None)
row.append(data)
pt.add_row(row)
print pt.get_string(sortby=sortby)
def print_dict(d, dict_property="Property"):
pt = prettytable.PrettyTable([dict_property, 'Value'], caching=False)
pt.align = 'l'
[pt.add_row(list(r)) for r in d.iteritems()]
print pt.get_string(sortby=dict_property)
class Row(dict):
"""A dict that allows for object-like property access syntax."""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
class Method:
def query(self, sql, is_row=True):
conn = connect_db()
cursor = conn.cursor()
try:
start_t = time.time()
cursor.execute(sql)
column_names = [d[0] for d in cursor.description]
rows = [Row(itertools.izip(column_names, row)) for row in cursor]
if is_row:
print_list(rows, column_names)
else:
index = 0
for row in rows:
index += 1
print '*' * 35, 'Row', index, '*' * 35
print_dict(row)
end_t = time.time()
print '%s row in set (%0.2f sec)' % (len(rows), end_t - start_t)
except:
traceback.print_exc()
finally:
cursor.close()
conn.close()
def _execute(self, sql):
conn = connect_db()
cursor = conn.cursor()
try:
start_t = time.time()
rows = cursor.execute(sql)
end_t = time.time()
print 'Query OK, %d rows affected (%0.2f sec)' % (
rows, end_t - start_t)
conn.commit()
except:
traceback.print_exc()
finally:
cursor.close()
conn.close()
def delete(self, sql, is_row=True):
self._execute(sql)
def update(self, sql, is_row=True):
self._execute(sql)
def insert(self, sql, is_row=True):
self._execute(sql)
def unknown(self, sql, is_row=True):
print 'Unknown SQL sentences, Ignore.'
def change_db(self, sql, is_row=True):
global CURRENT_DATABASE
CURRENT_DATABASE = sql.rpartition(' ')[2]
print 'Database changed'
def _G(sql):
sql = sql.strip()
if sql[-1] == ';':
sql = sql[:-1]
sql = sql.strip()
if len(sql) >= 2:
tail = sql[-2:]
if tail == '\G' or tail == '\g':
return sql[:-2], False
return sql, True
if __name__ == '__main__':
method_instance = Method()
while True:
sql = raw_input('pysql> ')
sql = sql.strip()
stype = sql_type.decide(sql)
method = getattr(method_instance, stype)
sql, is_row = _G(sql)
method(sql, is_row)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment