Last active
March 28, 2019 11:57
-
-
Save oamm/8348104 to your computer and use it in GitHub Desktop.
I try created a class connection between python and mysql, i hope this class can help in your proyects.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from Mysql import Mysql | |
connection = Mysql(host='localhost', user='root', password='', database='test') | |
#Assuming that our table have the fields id and name in this order. | |
#we can use this way but the parameter should have the same order that table | |
#connection.insert('table_name',parameters to insert) | |
connection.insert('test',1, 'Alejandro Mora') | |
#in this case the order isn't matter | |
#connection.insert('table_name', field=Value to insert) | |
connection.insert('test',name='Alejandro Mora', id=1) | |
#connection.select('Table', where="conditional(optional)", field to returned) | |
connection.select('test', where="name = 'Alejandro Mora' ") | |
connection.select('test','id','name') | |
#connection.update('Table', id, field=Value to update) | |
connection.update('test', 1, name='Alejandro') | |
#connection.delete('Table', id) | |
connection.delete('test', 1) | |
#connection.call_store_procedure(prodecure name, Values) | |
connection.call_store_procedure('search_users_by_name', 'Alejandro') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql | |
__author__ = 'Alejandro' | |
import mysql.connector | |
from mysql.connector import errorcode | |
class Mysql(object): | |
__instance = None | |
__host = None | |
__user = None | |
__password = None | |
__database = None | |
__session = None | |
__connection = None | |
def __new__(cls, *args, **kwargs): | |
if not cls.__instance: | |
cls.__instance = super(Mysql, cls).__new__(cls, *args, **kwargs) | |
return cls.__instance | |
def __init__(self, host='localhost', user='root', password='', database=''): | |
self.__host = host | |
self.__user = user | |
self.__password = password | |
self.__database = database | |
#Open connection with database | |
def _open(self): | |
try: | |
cnx = mysql.connector.connect(host=self.__host, user=self.__user, password=self.__password, | |
database=self.__database) | |
self.__connection = cnx | |
self.__session = cnx.cursor() | |
except mysql.connector.Error as err: | |
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: | |
print 'Something is wrong with your user name or password' | |
elif err.errno == errorcode.ER_BAD_DB_ERROR: | |
print 'Database does not exists' | |
else: | |
print err | |
def _close(self): | |
self.__session.close() | |
self.__connection.close() | |
def insert(self, table, *args, **kwargs): | |
values = None | |
query = "INSERT INTO %s " % table | |
if kwargs: | |
keys = kwargs.keys() | |
values = kwargs.values() | |
query += "(" + ",".join(["`%s`"]*len(keys)) % tuple(keys) + ") VALUES(" + ",".join(["%s"]*len(values)) + ")" | |
elif args: | |
values = args | |
query += " VALUES(" + ",".join(["%s"]*len(values)) + ")" | |
self._open() | |
self.__session.execute(query, values) | |
self.__connection.commit() | |
self._close() | |
return self.__session.lastrowid | |
def select(self, table, where=None, *args): | |
result = None | |
query = "SELECT " | |
keys = args | |
query = "SELECT `" + "`,`".join(keys) + "` FROM " + table | |
if where: | |
query += " WHERE %" % where | |
self._open() | |
self.__session.execute(query) | |
self.__connection.commit() | |
for result in self.__session.stored_results(): | |
result = result.fetchall() | |
self._close() | |
return result | |
def update(self, table, index, **kwargs): | |
query = "UPDATE %s SET" % table | |
keys = kwargs.keys() | |
values = kwargs.values() | |
l = len(keys) - 1 | |
for i, key in enumerate(keys): | |
query += "`"+key+"`=%s" | |
if i < l: | |
query += "," | |
query += " WHERE index=%d" % index | |
self._open() | |
self.__session.execute(query, values) | |
self.__connection.commit() | |
self._close() | |
def delete(self, table, index): | |
query = "DELETE FROM %s WHERE uuid=%d" % (table, index) | |
self._open() | |
self.__session.execute(query) | |
self.__connection.commit() | |
self._close() | |
def call_store_procedure(self, name, *args): | |
result_sp = None | |
self._open() | |
self.__session.callproc(name, args) | |
self.__connection.commit() | |
for result in self.__session.stored_results(): | |
result_sp = result.fetchall() | |
self._close() | |
return result_sp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment