Skip to content

Instantly share code, notes, and snippets.

@nskeip
Last active September 9, 2017 09:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nskeip/333855f0247b1aade4b68617b26275fa to your computer and use it in GitHub Desktop.
Save nskeip/333855f0247b1aade4b68617b26275fa to your computer and use it in GitHub Desktop.
import re
import sys
import sqlite3
import sqlparse
from PyQt5.QtWidgets import *
from PyQt5.QtCore import QAbstractTableModel, Qt
class SQLite3TableModel(QAbstractTableModel):
def __init__(self, db, queriesText, *argv, **kwargs):
super().__init__(*argv, **kwargs)
# We set _rowCount and _columnCount to 0
# assuming that zero values are for non-select queries
# and we will not display anything.
# If there is at least one select query in the text,
# these values will be changed to None
# (see the cycle below)
self.lastSelectQuery = None
self.lastSelectQueryIsExecuted = False
self._rowCount = 0
self._columnCount = 0
self.db = db
self.cur = db.cursor()
self._data = []
self._maxRowFetched = 0
self.lastError = None
self._headerData = None
manyQueries = sqlparse.parse(queriesText)
for parsedQuery in manyQueries:
query = self.removeSemicolonAndTrailingSpaces(str(parsedQuery))
if self.queryIsSelect(query):
# if it is select:
# we will not execute it
#
# None in _rowCount, _rowCount will be the mark that
# the numbers of rows/cols are yet to be defined
# and _queryIsSelect will mean that there is likely something to show
self.lastSelectQuery = query
self._rowCount = None
self._columnCount = None
else:
try:
self.cur.execute(query)
self.db.commit() # and we can commit a non-select transaction now
except sqlite3.OperationalError as e:
self.lastError = e
break
@staticmethod
def removeSemicolonAndTrailingSpaces(query):
return re.sub(r"\s*;\s*$", "", query.strip())
@staticmethod
def queryIsSelect(query):
try:
return str(sqlparse.parse(query)[0].tokens[0].value).lower() == 'select'
except IndexError:
return False
@staticmethod
def queryIsSelect(query):
try:
return str(sqlparse.parse(query)[0].tokens[0].value).lower() == 'select'
except IndexError:
return False
def data(self, index, role):
if self.lastSelectQuery is not None and role == Qt.DisplayRole:
# check if we already have the needed data in memory
# we will need to load data if
# - NO data has been cached in self._data
# or
# - there are some rows to fetch from DB
# and the index requested by index.row()
# is more than that one we have fetched before
if not self._data \
or (self._rowCount > self._maxRowFetched
and index.row() >= self._maxRowFetched):
# how many rows are left to fetch from the table?
rowsLeft = self._rowCount - self._maxRowFetched
# how many rows to fetch does index wants from us?
rowsIndexWants = index.row() + 1 - self._maxRowFetched
# number of records we will load
numberToLoad = min(rowsLeft, rowsIndexWants)
if not self.lastSelectQueryIsExecuted:
self.cur.execute(self.lastSelectQuery)
self.lastSelectQueryIsExecuted = True
self._data += self.cur.fetchmany(size=numberToLoad)
self._maxRowFetched += numberToLoad
return self._data[index.row()][index.column()]
def rowCount(self, parent):
if self._rowCount is None:
try:
# we need to use separate cursor here
_cur = self.db.cursor()
_cur.execute("SELECT COUNT (*) FROM (%s)" % self.lastSelectQuery)
self._rowCount = _cur.fetchone()[0]
except sqlite3.OperationalError as e:
self.lastError = e
self._rowCount = 0
return self._rowCount
def columnCount(self, parent):
if self._columnCount is None:
if self.rowCount(parent) == 0:
self._columnCount = 0 # if no rows fetched, show no columns
else:
# we need to use separate cursor here
_cur = self.db.cursor()
_cur.execute(self.lastSelectQuery)
fetched = _cur.fetchone()
if fetched:
self._columnCount = len(fetched)
else:
self._columnCount = 0
return self._columnCount
def headerData(self, section, orientation, role=None):
if self.lastSelectQuery is not None and \
orientation == Qt.Horizontal and \
role == Qt.DisplayRole:
if self._headerData is None:
_cur = self.db.cursor()
_cur.execute(self.lastSelectQuery)
self._headerData = [d[0] for d in _cur.description]
return self._headerData[section]
return super().headerData(section,orientation,role)
class SqlExecutor(QWidget):
def __init__(self, *argv, **kwargs):
super().__init__(*argv, **kwargs)
self.con = None
self.initUI()
def center(self):
qr = self.frameGeometry()
cp = QDesktopWidget().availableGeometry().center()
qr.moveCenter(cp)
self.move(qr.topLeft())
def initUI(self):
self.setGeometry(300, 300, 600, 400)
self.setWindowTitle('Sqlite query executor')
self.center()
grid = QGridLayout()
grid.setSpacing(10)
# connection string: label, input and button
connectionStringLabel = QLabel('Connection string:')
connectionStringEdit = QLineEdit('books.db')
self.connectionStringEdit = connectionStringEdit
connectButton = QPushButton('Connect', self)
connectButton.setCheckable(True)
connectButton.setDefault(False)
connectButton.clicked.connect(self.connectButtonClicked)
self.connectButton = connectButton
grid.addWidget(connectionStringLabel, 1, 0)
grid.addWidget(connectionStringEdit, 1, 1)
grid.addWidget(connectButton, 1, 2)
# sql query string: label, input and button
sqlLabel = QLabel('Query text:')
sqlEdit = QPlainTextEdit('select * from books')
self.sqlEdit = sqlEdit
executeButton = QPushButton('Run', self)
executeButton.setDefault(True)
executeButton.clicked.connect(self.executeButtonClicked)
self.executeButton = executeButton
self.setReadyToTakeConnectionString(True)
self.setReadyToTakeSQLText(False)
grid.addWidget(sqlLabel, 2, 0)
grid.addWidget(sqlEdit, 2, 1)
grid.addWidget(executeButton, 2, 2)
table = QTableView()
grid.addWidget(table, 3, 0, -1, -1)
self.table = table
self.setLayout(grid)
self.show()
def isConnected(self):
return self.con is not None
def createConnect(self, dbname):
self.con = sqlite3.connect(dbname)
self.setReadyToTakeSQLText(True)
def closeConnect(self):
self.con.close()
self.con = None
self.setReadyToTakeSQLText(False)
def setReadyToTakeSQLText(self, b):
self.sqlEdit.setEnabled(b)
self.executeButton.setEnabled(b)
def setReadyToTakeConnectionString(self, b):
self.connectionStringEdit.setEnabled(b)
def connectButtonClicked(self):
if not self.isConnected():
# connected
self.createConnect(self.connectionStringEdit.text())
else:
# disconnected
self.closeConnect()
self.setReadyToTakeConnectionString(not self.isConnected())
self.setReadyToTakeSQLText(self.isConnected())
def mb(self, title, message, icon=QMessageBox.Information):
msg = QMessageBox()
msg.setIcon(icon)
msg.setWindowTitle(title)
msg.setText(message)
msg.exec_()
def executeButtonClicked(self):
model = SQLite3TableModel(self.con, self.sqlEdit.toPlainText())
self.table.setModel(model)
if model.lastError is not None:
self.mb("Error", str(model.lastError), QMessageBox.Critical)
else:
self.mb("Query status", "Success")
if __name__ == '__main__':
app = QApplication(sys.argv)
ex = SqlExecutor()
sys.exit(app.exec_())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment