Skip to content

Instantly share code, notes, and snippets.

@seanjtaylor
Created November 19, 2018 00:07
Show Gist options
  • Star 26 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save seanjtaylor/01468ee224afca3d679c6dd1422efe6b to your computer and use it in GitHub Desktop.
Save seanjtaylor/01468ee224afca3d679c6dd1422efe6b to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# SQLite-backed Data Frame \n",
"\n",
"- Follows a Pythonic version of the dplyr API.\n",
"- Does not use Pandas or Numpy at all. Only one external library for pretty printing.\n",
"- Initializing a new data frame creates a new in-memory SQLite database which stores all of its derived data frames as tables.\n",
"- Getting data in is currently accomplished by setting columns directly. \n",
"- All operations return new data frames which are linked to new tables in the same database.\n",
"- Leverages SQLite to do all of the computation. All the class does is generate appropriate queries.\n",
"\n",
"## Some stuff that needs to be done\n",
"\n",
"Easy:\n",
"\n",
"- Allow columns to be other types (not just ints)\n",
"- Allow users to remove columns in select statements.\n",
"- Figure out how to read in csv files or from other sources efficiently.\n",
"- Figure out how to write to csv files efficiently.\n",
"- Share the same connection using a singleton connection.\n",
"- Other types of joins and methods.\n",
"\n",
"\n",
"Harder:\n",
"- Use window functions during mutate.\n",
"- Add reshaping (stack/unstack) operations.\n",
"- Benchmark and check efficiency of various operations."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import sqlite3\n",
"import random\n",
"import string\n",
"\n",
"# Used for printing the tables\n",
"from tabulate import tabulate"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"class SqliteDF:\n",
" def __init__(self, con=None, table=None, group_cols=None, cache=None, verbose=False):\n",
" \"\"\"\n",
" Keyword arguments:\n",
" con -- A SQLite connection object.\n",
" table -- the name of the table associated with this dataframe\n",
" group_cols -- for grouping operations, the set of columns we are grouping by\n",
" cache -- shared across instances in order to memoize select statements\n",
" verbose - print SQL statements.\n",
" \"\"\"\n",
" if table is None:\n",
" self.table = self._make_table_name()\n",
" self.created = False\n",
" else:\n",
" self.table = table\n",
" self.created = True\n",
" \n",
" if con is None:\n",
" self.con = sqlite3.connect(':memory:')\n",
" self.max_index = 0\n",
" else:\n",
" self.con = con\n",
" self.max_index = self.con.cursor().execute('select max(ROWID) from {}'.format(self.table)).fetchone()[0]\n",
" \n",
" self._group_cols = group_cols\n",
" self._cache = cache if cache is not None else {}\n",
" self.verbose = verbose\n",
" \n",
" def __repr__(self):\n",
" return 'SqliteDF({} x {})'.format(self.max_index, len(self.keys()))\n",
" \n",
" def __setitem__(self, key, values):\n",
" \"\"\"Primary way to get data in. Sets one column, which must be aligned with existing columns.\n",
" \n",
" Arguments:\n",
" key -- the name of the column to create\n",
" value -- an iterable of values the column takes, must be ordered\n",
" \"\"\"\n",
" if self.created:\n",
" self.con.execute('alter table {} add column {} int'.format(self.table, key))\n",
" else:\n",
" self.con.cursor().execute('create table {} ({} int)'.format(self.table, key))\n",
" self.created = True\n",
" \n",
" if len(values) > self.max_index:\n",
" to_insert = [(i,) for i in range(self.max_index, len(values))]\n",
" self.con.cursor().executemany('insert into {} (ROWID) values (?)'.format(self.table, key), to_insert)\n",
" self.max_index = len(values)\n",
" \n",
" self.con.cursor().executemany('update {} set {}=? where ROWID=?'.format(self.table, key), \n",
" [(v,i) for i,v in enumerate(values)])\n",
" \n",
" def keys(self):\n",
" \"\"\"Return all columns in the data frame.\"\"\"\n",
" tables = self.con.cursor().execute(\"pragma table_info({})\".format(self.table)).fetchall()\n",
" return [t[1] for t in tables]\n",
" \n",
" def __getitem__(self, key):\n",
" \"\"\"Get an individual column from the data frame.\"\"\"\n",
" rows = self.con.cursor().execute('select {} from {} order by ROWID'.format(key, self.table)).fetchall()\n",
" return [row[0] for row in rows]\n",
" \n",
" def print(self):\n",
" \"\"\"Render the data frame to a string and print.\"\"\"\n",
" rows = self.con.cursor().execute('select * from {}'.format(self.table)).fetchall()\n",
" print(tabulate(rows, headers=self.keys()))\n",
" \n",
" def _make_table_name(self):\n",
" \"\"\"Generate a random name for a create table statement.\"\"\"\n",
" return ''.join(random.choices(string.ascii_letters[:26], k=10))\n",
" \n",
" def _create_table(self, select, memoize=True):\n",
" \"\"\"Create a new table and return a data frame linked to it.\n",
" \n",
" Arguments:\n",
" select -- a query that selects data from the current set of available tables.\n",
" memoize -- whether to cache the select query or re-run it.\n",
" \"\"\"\n",
" if memoize and select in self._cache:\n",
" table = self._cache[select]\n",
" return SqliteDF(self.con, table, cache=self._cache)\n",
" \n",
" table = self._make_table_name()\n",
" q = 'create table {} as {}'.format(table, select)\n",
" if self.verbose:\n",
" print(select)\n",
" self.con.cursor().execute(q)\n",
" if memoize:\n",
" self._cache[select] = table\n",
" return SqliteDF(self.con, table, cache=self._cache)\n",
" \n",
" def head(self, n=5):\n",
" q = 'select * from {} order by ROWID limit {}'.format(self.table, n)\n",
" return self._create_table(q)\n",
" \n",
" def tail(self, n=5):\n",
" q = 'select * from {} order by ROWID desc limit {}'.format(self.table, n)\n",
" return self._create_table(q)\n",
" \n",
" def group_by(self, *columns):\n",
" \"\"\"Returns the same data frame with group columns added.\"\"\"\n",
" return SqliteDF(self.con, self.table, group_cols=columns, cache=self._cache)\n",
" \n",
" def ungroup(self):\n",
" \"\"\"Returns the same data frame with group columns removed.\"\"\"\n",
" return SqliteDF(self.con, self.table, group_cols=None, cache=self._cache)\n",
" \n",
" def count(self):\n",
" if not self._group_cols:\n",
" q = 'select count(1) as n from {}'.format(self.table)\n",
" else:\n",
" q = 'select {}, count(1) as n from {} group by {}'.format(\n",
" ', '.join(self._group_cols),\n",
" self.table,\n",
" ', '.join(self._group_cols),\n",
" )\n",
" return self._create_table(q)\n",
" \n",
" def summarise(self, **exprs):\n",
" summaries_list = []\n",
" for key, value in exprs.items():\n",
" summaries_list.append('{} as {}'.format(value, key))\n",
" summaries = ', '.join(summaries_list)\n",
" if not self._group_cols:\n",
" q = 'select {} from {}'.format(summaries, self.table)\n",
" else:\n",
" q = 'select {}, {} from {} group by {}'.format(\n",
" ', '.join(self._group_cols),\n",
" summaries,\n",
" self.table,\n",
" ', '.join(self._group_cols),\n",
" )\n",
" return self._create_table(q)\n",
" \n",
" def select(self, *columns):\n",
" q = 'select {} from {}'.format(\n",
" ', '.join(columns),\n",
" self.table,\n",
" )\n",
" return self._create_table(q)\n",
" \n",
" def mutate(self, **exprs):\n",
" mutations_list = []\n",
" for key, value in exprs.items():\n",
" mutations_list.append('{} as {}'.format(value, key))\n",
" keys = self.keys()\n",
" mutations = ', '.join(mutations_list)\n",
" q = 'select {}, {} from {}'.format(\n",
" ', '.join(keys),\n",
" mutations,\n",
" self.table,\n",
" )\n",
" return self._create_table(q)\n",
" \n",
" def inner_join(self, other_df):\n",
" left_keys = self.keys()\n",
" right_keys = other_df.keys()\n",
" overlap = [lk for lk in left_keys if lk in right_keys]\n",
" on_statement = ' and '.join('a.{0} = b.{0}'.format(col) for col in overlap)\n",
" \n",
" cols_to_select = (\n",
" ['a.{}'.format(o) for o in overlap] \n",
" + ['a.{}'.format(lk) for lk in left_keys if lk not in overlap] \n",
" + ['b.{}'.format(rk) for rk in right_keys if (rk not in overlap) and (rk not in left_keys)]\n",
" )\n",
" \n",
" q = 'select {} from {} a inner join {} b on {}'.format(\n",
" ', '.join(cols_to_select),\n",
" self.table,\n",
" other_df.table,\n",
" on_statement,\n",
" )\n",
" return self._create_table(q)\n",
" \n",
" def bind_rows(self, other_df):\n",
" left_keys = self.keys()\n",
" right_keys = other_df.keys()\n",
" overlap = [lk for lk in left_keys if lk in right_keys]\n",
" \n",
" cols = ', '.join(overlap)\n",
" q = 'select {} FROM {} UNION ALL SELECT {} FROM {}'.format(\n",
" cols,\n",
" self.table,\n",
" cols,\n",
" other_df.table,\n",
" )\n",
" return self._create_table(q)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Creating and adding new columns."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" person score stuff\n",
"-------- ------- -------\n",
" 1 19 1\n",
" 1 21\n",
" 1 31\n",
" 2 43\n",
" 2 25\n",
" 2 61\n",
" 2 17\n",
" 3 90\n"
]
}
],
"source": [
"df = SqliteDF(verbose=True)\n",
"df['person'] = [1,1,1,2,2,2,2,3]\n",
"df['score'] = [19,21,31,43,25,61,17,90]\n",
"df['stuff'] = [1]\n",
"df.print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Counting rows."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"select count(1) as n from eirrilizry\n",
" n\n",
"---\n",
" 8\n"
]
}
],
"source": [
"df.count().print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Grouping and summarizing."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" person max_score min_score\n",
"-------- ----------- -----------\n",
" 1 31 19\n",
" 2 61 17\n",
" 3 90 90\n"
]
}
],
"source": [
"(df\n",
" .group_by('person')\n",
" .summarise(max_score='max(score)', min_score='min(score)')\n",
").print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Non-grouped summarizing."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"select max(score) as best from eirrilizry\n",
" best\n",
"------\n",
" 90\n"
]
}
],
"source": [
"df.summarise(best='max(score)').print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Grouped counting."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" person n\n",
"-------- ---\n",
" 1 3\n",
" 2 4\n",
" 3 1\n"
]
}
],
"source": [
"df2 = df.group_by('person').count()\n",
"df2.print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Selecting specific columns and joining."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"select person, score from eirrilizry\n",
" person score n\n",
"-------- ------- ---\n",
" 1 19 3\n",
" 1 21 3\n",
" 1 31 3\n",
" 2 43 4\n",
" 2 25 4\n",
" 2 61 4\n",
" 2 17 4\n",
" 3 90 1\n"
]
}
],
"source": [
"df.select('person', 'score').inner_join(df2).print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Mutations using SQL only, later, register Python functions."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"select person, score, stuff, score * score as score2 from eirrilizry\n",
" person score stuff score2\n",
"-------- ------- ------- --------\n",
" 1 19 1 361\n",
" 1 21 441\n",
" 1 31 961\n",
" 2 43 1849\n",
" 2 25 625\n",
" 2 61 3721\n",
" 2 17 289\n",
" 3 90 8100\n"
]
}
],
"source": [
"df.mutate(score2='score * score').print()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"select score from eirrilizry\n",
" score\n",
"-------\n",
" 19\n",
" 21\n",
" 31\n",
" 43\n",
" 25\n",
" 61\n",
" 17\n",
" 90\n"
]
}
],
"source": [
"df.select('score').print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Binding rows."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"select person, score, stuff FROM eirrilizry UNION ALL SELECT person, score, stuff FROM eirrilizry\n",
" person score stuff\n",
"-------- ------- -------\n",
" 1 19 1\n",
" 1 21\n",
" 1 31\n",
" 2 43\n",
" 2 25\n",
" 2 61\n",
" 2 17\n",
" 3 90\n",
" 1 19 1\n",
" 1 21\n",
" 1 31\n",
" 2 43\n",
" 2 25\n",
" 2 61\n",
" 2 17\n",
" 3 90\n"
]
}
],
"source": [
"df.bind_rows(df).print()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment