Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
sqlite3 name escaping in Python (re: Stack Overflow question #6514274)

Putting up a bounty didn't produce a solution to this, so I've spent a while figuring out the best one I could. I don't have any source confirming that this is the correct way to do it, so use at your own risk.

Solution

To convert any string into a SQLite identifier:

  • Ensure the string can be encoded as UTF-8.
  • Ensure the string does not include any NUL characters.
  • Replace all " with "".
  • Wrap the entire thing in double quotes.

My implementation:

import codecs

def quote_identifier(s, errors="strict"):
    encodable = s.encode("utf-8", errors).decode("utf-8")
    
    nul_index = encodable.find("\x00")
    
    if nul_index >= 0:
        error = UnicodeEncodeError("utf-8", encodable, nul_index, nul_index + 1, "NUL not allowed")
        error_handler = codecs.lookup_error(errors)
        replacement, _ = error_handler(error)
        encodable = encodable.replace("\x00", replacement)
    
    return "\"" + encodable.replace("\"", "\"\"") + "\""

Given a string single argument, it will escape and quote it correctly or raise an exception. The second argument can be used to specify any error handler registered in the codecs module. The built-in ones are:

  • 'strict': raise an exception in case of an encoding error
  • 'replace': replace malformed data with a suitable replacement marker, such as '?' or '\ufffd'
  • 'ignore': ignore malformed data and continue without further notice
  • 'xmlcharrefreplace': replace with the appropriate XML character reference (for encoding only)
  • 'backslashreplace': replace with backslashed escape sequences (for encoding only) This doesn't check for reserved identifiers, so if you try to create a new SQLITE_MASTER table it won't stop you.

Example Usage

import sqlite3

def test_identifier(identifier):
    "Tests an identifier to ensure it's handled properly."
    
    with sqlite3.connect(":memory:") as c:
        c.execute("CREATE TABLE " + quote_identifier(identifier) + " (foo)")
        assert identifier == c.execute("SELECT name FROM SQLITE_MASTER").fetchone()[0]

test_identifier("'Héllo?'\\\n\r\t\"Hello!\" -☃") # works
test_identifier("北方话") # works
test_identifier(chr(0x20000)) # works

print(quote_identifier("Fo\x00o!", "replace")) # prints "Fo?o!"
print(quote_identifier("Fo\x00o!", "ignore")) # prints "Foo!"
print(quote_identifier("Fo\x00o!")) # raises UnicodeEncodeError
print(quote_identifier(chr(0xD800))) # raises UnicodeEncodeError

Relevant Observations

  • SQLite identifiers are TEXT, not binary.
    • Reference: SQLITE_MASTER schema in the FAQ.
    • Reference: Python 2 SQLite API yelling at me when I gave it bytes it couldn't decode as text.
    • Reference: Python 3 SQLite API requires queries be strs, not bytes.
  • SQLite identifiers are quoted using double-quotes.
  • Double-quotes in SQLite identifiers are escaped as two double quotes.
  • SQLite identifiers preserve case, but they are case-insensitive towards ASCII letters. It is possible to enable unicode-aware case-insensitivity.
  • SQLite stops reading queries at the NUL character, but does not have any way to escape it.
    • Reference: PHP came up with their own binary encoding to handle NUL; presumably if they could just store it directly they would.
  • sqlite3 can handle any other unicode string as long as it can be properly encoded to UTF-8. Invalid strings could cause crashes between Python 3.0 and Python 3.1.2 or thereabouts. Python 2 accepted these invalid strings, but this is considered a bug.
#!/usr/bin/env python3.2
import sqlite3
print (sqlite3.version)
print (sqlite3.sqlite_version)
import sys
c = sqlite3.connect(":memory:")
table_name = "f" * 200000
sql = "create table " + table_name + " (bar)"
c.execute(sql)
c.execute(sql)
#!/usr/bin/env python3.1
import sqlite3
import codecs
def quote_identifier(s, errors="strict"):
encodable = s.encode("utf-8", errors).decode("utf-8")
nul_index = encodable.find("\x00")
if nul_index >= 0:
error = UnicodeEncodeError("SQLite's utf-8", encodable, nul_index, nul_index + 1, "NUL not allowed")
error_handler = codecs.lookup_error(errors)
replacement, _ = error_handler(error)
encodable = encodable.replace("\x00", replacement)
return "\"" + encodable.replace("\"", "\"\"") + "\""
def test_identifier(identifier):
"Tests an identifier to ensure it's handled properly."
with sqlite3.connect(":memory:") as c:
c.execute("CREATE TABLE " + quote_identifier(identifier) + " (foo)")
assert identifier == c.execute("SELECT name FROM SQLITE_MASTER").fetchone()[0]
print(quote_identifier(chr(0xD800))) # raises UnicodeEncodeError
#!/usr/bin/env python3
import sqlite3
# table names are case-insensetive but preserve case
ERROR = object()
def sanitize_characters(string, replace_invalid_with=ERROR):
try:
string.encode("utf-8")
if "\0" in string:
raise UnicodeEncodeError
yield string
except UnicodeEncodeError:
for character in string:
point = ord(character)
if point == 0:
if replace_invalid_with is ERROR:
raise ValueError("SQLite identifier contains NUL character.")
else:
yield replace_invalid_with
elif 0xD800 <= point <= 0xDBFF:
if replace_invalid_with is ERROR:
raise ValueError("SQLite identifier contains high-surrogate character.")
else:
yield replace_invalid_with
elif 0xDC00 <= point <= 0xDFFF:
if replace_invalid_with is ERROR:
raise ValueError("SQLite identifier contains low-surrogate character.")
else:
yield replace_invalid_with
# elif (0xE000 <= point <= 0xF8FF or
# 0xF0000 <= point <= 0xFFFFD or
# 0x100000 <= point <= 0x10FFFD):
# if replace_invalid_with is ERROR:
# raise ValueError("SQLite identifier contains private user character.")
# else:
# yield replace_invalid_with
# elif 0xFDD0 <= point <= 0xFDEF or (point % 0x10000) in (0xFFFE, 0xFFFF):
# if replace_invalid_with is ERROR:
# raise ValueError("SQLite identifier contains non-character character.")
# else:
# yield replace_invalid_with
else:
yield character
def quote_identifier(identifier, replace_invalid_with=ERROR):
sanitized = "".join(sanitize_characters(identifier, replace_invalid_with))
return "\"" + sanitized.replace("\"", "\"\"") + "\""
def test_identifier(identifier, replace_invalid_with=ERROR):
with sqlite3.connect(":memory:") as connection:
sql = "create table " + quote_identifier(identifier, replace_invalid_with) + " (data)"
connection.execute(sql)
for (name,) in connection.execute("SELECT name FROM SQLITE_master"):
assert(name == identifier) or (name == "".join(sanitize_characters(identifier, replace_invalid_with))), (name, identifier, sanitize_characters(identifier, replace_invalid_with))
break
else:
assert False, "no table created?"
#
# with sqlite3.connect(":memory:") as c:
# identifier = "Fé'\"!"
#
# quoted = quote_identifier(identifier)
#
# c.execute("CREATE TABLE " + quoted + " (bar)")
# assert [(identifier,)] == list(c.execute("SELECT name FROM SQLITE_MASTER"))
#
# with sqlite3.connect(":memory:") as c:
# identifier = "Foo\0 bar?"
#
# quoted = quote_identifier(identifier, "_")
#
# c.execute("CREATE TABLE " + quoted + " (bar)")
# assert [("Foo_ bar?",)] == list(c.execute("SELECT name FROM SQLITE_MASTER"))
#!/usr/bin/env python3.2
from q6514274 import test_identifier
for ones in range(0, 0xFFF + 1):
for thousands in range(0, 0x10F + 1):
point = thousands * 0x1000 + ones
identifier = "fo" + chr(point)
try:
test_identifier(identifier)
except Exception as e:
print(hex(point), e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment