Skip to content

Instantly share code, notes, and snippets.

@polynomialherder
Last active January 24, 2024 19:14
Show Gist options
  • Save polynomialherder/3f9bcd91433d59c87bd21fccb1bc0147 to your computer and use it in GitHub Desktop.
Save polynomialherder/3f9bcd91433d59c87bd21fccb1bc0147 to your computer and use it in GitHub Desktop.
Python script that extracts inlined SQL from a C# script and converts it to the Databricks notebook format
"""
Zero dependency script that extracts inlined SQL from a C# module and translates it to a Databricks formatted notebook
Suppose that you have a 2500 line C# module containing lots of methods of the form
public DataTable GetSomething(...) {
...
var sql = @"<sql query>"
...
}
public DataTable GetSomethingElse(...) {
...
var sql = @"<another
sql
query
spanning multiple
lines>"
...
}
and you wish you had all of those SQL scripts in a convenient Databricks notebook, each in their own cells that could be
run independently.
This script contains a lexer class that will lex your C# script into tokens of the form
[
(TokenType.NAME_FUNCTION, "Something"),
(TokenType.SQL_STRING, "SELECT whatever FROM ..."),
(TokenType.NAME_FUNCTION, "SomethingElse"),
(TokenType.SQL_STRING, "SELECT whatever2 FROM ...")
...
]
All other C# tokens are ignored, but this script can easily be extended to match other C# token types
The compiler class then writes everything to a Databricks notebook, each query in its own cell headed by the
corresponding function names (with the leading Get stripped)
Usage:
No dependencies. Just change the INPUT_FILE_PATH variable to point to your C# script. Then run python sql_to_databricks.py
"""
INPUT_FILE_PATH = "changeme"
from enum import Enum, auto
import re
class TokenType(Enum):
VAR = auto()
NAME_SQL = auto()
ARROBA = auto()
QUOTE = auto()
SQL_STRING = auto()
EOF = auto()
NAME_FUNCTION = auto()
class Lexer:
def __init__(self, text):
self.text = text
self.position = 0
self.token_type = None
self.buffer = ""
self.tokens = []
def dump_buffer(self):
self.tokens.append((self.token_type, self.buffer))
self.buffer = ""
def advance(self):
self.position += 1
@property
def current_character(self):
return self.text[self.position]
@property
def is_eof(self):
try:
self.token_type
return False
except IndexError:
return True
def eat(self):
self.buffer += self.current_character
self.advance()
def peek(self, lookahead=1):
try:
return self.text[self.position + lookahead]
except IndexError:
return "EOF"
def read_arroba(self):
self.token_type = TokenType.ARROBA
self.eat()
self.dump_buffer()
def read_var(self):
maybe_var = "".join([self.current_character, self.peek(1), self.peek(2)])
if maybe_var == "var":
self.token_type = TokenType.VAR
self.eat()
self.eat()
self.eat()
self.dump_buffer()
else:
self.advance()
def next_is_quote(self):
return self.peek() == "'" or self.peek() == '"'
def next_are_ql(self):
return self.peek() == "q" and self.peek(2) == "l" and self.peek(3) == " "
def is_quote(self):
return self.current_character in ('"')
def read_SQL_STRING(self):
self.token_type = TokenType.SQL_STRING
self.advance()
self.advance()
while not self.is_quote():
self.eat()
self.advance()
self.dump_buffer()
def read_sql(self):
self.token_type = TokenType.NAME_SQL
self.eat()
self.eat()
self.eat()
self.dump_buffer()
self.advance()
def next_is_ublic_datatable(self):
ublic_datatable = "ublic DataTable Get"
maybe_ublic_datatable = "".join(
[self.peek(1 + i) for i in range(len(ublic_datatable))]
)
return maybe_ublic_datatable == ublic_datatable
def read_function_name(self):
self.token_type = TokenType.NAME_FUNCTION
ublic_datatable = "ublic DataTable Get"
for _ in range(len(ublic_datatable)):
self.advance()
self.advance()
while self.current_character != "(":
self.eat()
self.dump_buffer()
self.advance()
def lex(self):
while self.position < len(self.text):
match self.current_character:
case "\t" | " " | "\n" | "\r":
self.advance()
case "v":
self.read_var()
case "@" if self.next_is_quote():
self.read_SQL_STRING()
case "s" if self.next_are_ql():
self.read_sql()
case "p" if self.next_is_ublic_datatable():
self.read_function_name()
case _:
self.advance()
return self.tokens
class Compiler:
def __init__(self, tokens):
self.tokens = tokens
self.buffer = ""
def emit_notebook_header(self):
self.buffer += "# Databricks notebook source\n"
def emit_sql_header(self):
self.buffer += "# MAGIC %sql\n"
def emit_markdown_header(self):
self.buffer += "# MAGIC %md\n"
def emit_sql_line(self, line):
self.buffer += f"# MAGIC \t{line.strip()}\n"
def emit_command_header(self):
self.buffer += "# COMMAND ----------\n\n"
def emit_sql_block(self, sql_text):
sql_text = self.remove_transaction_setting(sql_text)
self.emit_sql_header()
lines = sql_text.split("\n")
for line in lines:
self.emit_sql_line(line)
def emit_markdown_block(self, function_name):
self.emit_markdown_header()
self.buffer += f"#❌ {function_name} \n\n"
def compile(self):
self.emit_notebook_header()
for token in self.tokens:
match token:
case (TokenType.NAME_FUNCTION, text):
self.emit_command_header()
self.emit_markdown_block(text)
case (TokenType.SQL_STRING, text):
self.emit_command_header()
self.emit_sql_block(text)
case _:
continue
@staticmethod
def remove_transaction_setting(query):
pattern = re.compile(r"(?i)SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
new_query = pattern.sub("", query)
return new_query
if __name__ == "__main__":
with open(INPUT_FILE_PATH, "r") as f:
txt = f.read()
l = Lexer(txt)
l.lex()
c = Compiler(l.tokens)
c.compile()
with open("Queries.py", "w") as f:
f.write(c.buffer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment