Skip to content

Instantly share code, notes, and snippets.

@2248998661
Created December 10, 2020 07:05
Show Gist options
  • Save 2248998661/5749c5d35921a6422a5788e9b1accced to your computer and use it in GitHub Desktop.
Save 2248998661/5749c5d35921a6422a5788e9b1accced to your computer and use it in GitHub Desktop.
# __*__ conding:utf-8 __*__
# author: dust
# python -V : 3.7
import time
import logging
import traceback
import pymysql
import pymysql.cursors
version = "0.7"
version_info = (0, 7, 0, 0)
class Connection(object):
"""一个封装 MySQL 的类
"""
def __init__(self, host, database, user=None, password=None,
port=0,
max_idle_time=7 * 3600, connect_timeout=10,
time_zone="+0:00", charset = "utf8mb4", sql_mode="TRADITIONAL"):
'''
:param host: host
:param database:指定数据库
:param user:user
:param password:password
:param port:端口
:param max_idle_time:最大空闲时间,MySQL 默认连接时间为 8 小时,超过机会断开连接,设置比 8 小时小即可,到达设置的时间会重新建立连接
:param connect_timeout:
:param time_zone:
:param charset:编码
:param sql_mode:
'''
self.host = host
self.database = database
self.max_idle_time = float(max_idle_time)
# mysql 连接设置
args = dict(use_unicode=True, charset=charset,
database=database,
init_command=('SET time_zone = "%s"' % time_zone),
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=connect_timeout, sql_mode=sql_mode)
if user is not None:
args["user"] = user
if password is not None:
args["passwd"] = password
# 接受 MySQL 套接字:ip:port
if "/" in host:
args["unix_socket"] = host
else:
self.socket = None
pair = host.split(":")
if len(pair) == 2:
args["host"] = pair[0]
args["port"] = int(pair[1])
else:
args["host"] = host
args["port"] = 3306
if port:
args['port'] = port
self._db = None
self._db_args = args
self._last_use_time = time.time()
try:
self.reconnect()
except Exception:
logging.error("Cannot connect to MySQL on %s", self.host,
exc_info=True)
def _ensure_connected(self):
'''mysql 连接时间
'''
if (self._db is None or
(time.time() - self._last_use_time > self.max_idle_time)):
self.reconnect()
self._last_use_time = time.time()
def _cursor(self):
'''获取 SQL 光标
'''
self._ensure_connected()
return self._db.cursor()
def __del__(self):
'''手动释放内存
'''
self.close()
def close(self):
"""关闭数据库连接."""
if getattr(self, "_db", None) is not None:
self._db.close()
self._db = None
def reconnect(self):
"""关闭数据库再打开."""
self.close()
self._db = pymysql.connect(**self._db_args)
self._db.autocommit(True)
def query(self, query, *parameters, **kwparameters):
"""返回查询的参数及行号"""
cursor = self._cursor()
try:
cursor.execute(query, kwparameters or parameters)
result = cursor.fetchall()
return result
finally:
cursor.close()
def get(self, query, *parameters, **kwparameters):
"""返回查询数据的行.
"""
cursor = self._cursor()
try:
cursor.execute(query, kwparameters or parameters)
return cursor.fetchone()
finally:
cursor.close()
def execute(self, query, *parameters, **kwparameters):
"""查询语句."""
cursor = self._cursor()
try:
cursor.execute(query, kwparameters or parameters)
return cursor.lastrowid
except Exception as e:
if e.args[0] == 1062:
pass
else:
traceback.print_exc()
raise e
finally:
cursor.close()
# 别名
insert = execute
## ===============类似 mongo 的用法 ===================
def table_has(self, table_name, field, value):
if isinstance(value, str):
value = value.encode('utf8')
sql = 'SELECT %s FROM %s WHERE %s="%s"' % (
field,
table_name,
field,
value)
d = self.get(sql)
return d
def table_insert(self, table_name, item):
'''以字典的形式插入,键为 mysql 的表字段'''
fields = list(item.keys())
values = list(item.values())
fieldstr = ','.join(fields)
valstr = ','.join(['%s'] * len(item))
for i in range(len(values)):
if isinstance(values[i], str):
values[i] = values[i].encode('utf8')
sql = 'INSERT INTO %s (%s) VALUES(%s)' % (table_name, fieldstr, valstr)
try:
last_id = self.execute(sql, *values)
return last_id
except Exception as e:
if e.args[0] == 1062:
# 如果查询的数据存在于 mysql 中
pass
else:
traceback.print_exc()
print('sql:', sql)
print('item:')
for i in range(len(fields)):
vs = str(values[i])
if len(vs) > 300:
print(fields[i], ' : ', len(vs), type(values[i]))
else:
print(fields[i], ' : ', vs, type(values[i]))
raise e
def table_update(self, table_name, updates,
field_where, value_where):
'''更新 item'''
upsets = []
values = []
for k, v in updates.items():
s = '%s=%%s' % k
upsets.append(s)
values.append(v)
upsets = ','.join(upsets)
sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
table_name,
upsets,
field_where, value_where,
)
self.execute(sql, *(values))
if __name__ == '__main__':
db = Connection(
'127.0.0.1',
'jd',
'root',
'kkhkb445gg'
)
# 获取一条记录
sql = 'select * from jd_list where id="3"'
data = db.get(sql)
print(data)
# # 获取多条记录
# sql = 'select * from test_table where id>%s'
# data = db.query(sql, 2)
# # 插入一条数据
# sql = 'insert into test_table(title, url) values(%s, %s)'
# last_id = db.execute(sql, 'test', 'http://a.com/')
# # 或者
# last_id = db.insert(sql, 'test', 'http://a.com/')
#
#
# # 插入类似 MongoDB 的字典,MySQL 插入的字典的键需要与 MySQL 中的字段相对应,否则报错
# item = {
# 'title': 'test',
# 'url': 'http://a.com/',
# }
# last_id = db.table_insert('test_table', item)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment