Skip to content

Instantly share code, notes, and snippets.

@tenko
Last active February 27, 2020 20:54
Show Gist options
  • Save tenko/93ffd3d69d20ebd95522b361f90f7f16 to your computer and use it in GitHub Desktop.
Save tenko/93ffd3d69d20ebd95522b361f90f7f16 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
cimport cython
from cython cimport view
from cpython.buffer cimport PyObject_CheckBuffer, PyObject_GetBuffer, PyBuffer_Release
from cpython.buffer cimport PyBUF_FORMAT, PyBUF_INDIRECT
from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF
from libcpp cimport bool as cbool
from libc.stdint cimport int8_t, int16_t, int32_t, uint64_t, int64_t
from libc.stdlib cimport malloc, free
cdef extern from *:
PyObject *PyUnicode_FromString(const char *u)
cdef extern from "duckdb.h" nogil:
ctypedef uint64_t index_t
cdef enum DUCKDB_TYPE:
DUCKDB_TYPE_INVALID
DUCKDB_TYPE_BOOLEAN
DUCKDB_TYPE_TINYINT
DUCKDB_TYPE_SMALLINT
DUCKDB_TYPE_INTEGER
DUCKDB_TYPE_BIGINT
DUCKDB_TYPE_FLOAT
DUCKDB_TYPE_DOUBLE
DUCKDB_TYPE_TIMESTAMP
DUCKDB_TYPE_DATE
DUCKDB_TYPE_TIME
DUCKDB_TYPE_VARCHAR
ctypedef DUCKDB_TYPE duckdb_type
cdef struct _duckdb_date:
int32_t year
int8_t month
int8_t day
ctypedef _duckdb_date duckdb_date
cdef struct _duckdb_time:
int8_t hour
int8_t min
int8_t sec
int16_t msec
ctypedef _duckdb_time duckdb_time
cdef struct _duckdb_timestamp:
duckdb_date date
duckdb_time time
ctypedef _duckdb_timestamp duckdb_timestamp
cdef struct _duckdb_column:
void *data
cbool *nullmask
duckdb_type type
char *name
ctypedef _duckdb_column duckdb_column
cdef struct _duckdb_result:
index_t column_count
index_t row_count
duckdb_column *columns
char *error_message
ctypedef _duckdb_result duckdb_result
ctypedef void *duckdb_database
ctypedef void *duckdb_connection
ctypedef void *duckdb_prepared_statement
cdef enum _duckdb_state:
DuckDBSuccess
DuckDBError
ctypedef _duckdb_state duckdb_state
duckdb_state duckdb_open(const char *path, duckdb_database *out_database)
void duckdb_close(duckdb_database *database)
duckdb_state duckdb_connect(duckdb_database database, duckdb_connection *out_connection);
void duckdb_disconnect(duckdb_connection *connection);
duckdb_state duckdb_query(duckdb_connection connection, const char *query, duckdb_result *out_result)
void duckdb_destroy_result(duckdb_result *result)
cbool duckdb_value_boolean(duckdb_result *result, index_t col, index_t row)
int8_t duckdb_value_int8(duckdb_result *result, index_t col, index_t row)
int16_t duckdb_value_int16(duckdb_result *result, index_t col, index_t row)
int32_t duckdb_value_int32(duckdb_result *result, index_t col, index_t row)
int64_t duckdb_value_int64(duckdb_result *result, index_t col, index_t row)
float duckdb_value_float(duckdb_result *result, index_t col, index_t row)
double duckdb_value_double(duckdb_result *result, index_t col, index_t row)
char *duckdb_value_varchar(duckdb_result *result, index_t col, index_t row)
duckdb_state duckdb_prepare(duckdb_connection connection, const char *query,
duckdb_prepared_statement *out_prepared_statement)
duckdb_state duckdb_nparams(duckdb_prepared_statement prepared_statement, index_t *nparams_out)
duckdb_state duckdb_bind_boolean(duckdb_prepared_statement prepared_statement, index_t param_idx, cbool val)
duckdb_state duckdb_bind_int8(duckdb_prepared_statement prepared_statement, index_t param_idx, int8_t val)
duckdb_state duckdb_bind_int16(duckdb_prepared_statement prepared_statement, index_t param_idx, int16_t val)
duckdb_state duckdb_bind_int32(duckdb_prepared_statement prepared_statement, index_t param_idx, int32_t val)
duckdb_state duckdb_bind_int64(duckdb_prepared_statement prepared_statement, index_t param_idx, int64_t val)
duckdb_state duckdb_bind_float(duckdb_prepared_statement prepared_statement, index_t param_idx, float val)
duckdb_state duckdb_bind_double(duckdb_prepared_statement prepared_statement, index_t param_idx, double val)
duckdb_state duckdb_bind_varchar(duckdb_prepared_statement prepared_statement, index_t param_idx, const char *val)
duckdb_state duckdb_bind_null(duckdb_prepared_statement prepared_statement, index_t param_idx)
duckdb_state duckdb_execute_prepared(duckdb_prepared_statement prepared_statement, duckdb_result *out_result)
void duckdb_destroy_prepare(duckdb_prepared_statement *prepared_statement)
import datetime
apilevel = "2.0"
threadsafety = 0
paramstyle = "qmark"
class Warning(Exception):
pass
class Error(Exception):
pass
class InterfaceError(Error):
pass
class DatabaseError(Error):
pass
class DataError(DatabaseError):
pass
class OperationalError(DatabaseError):
pass
class IntegrityError(DatabaseError):
pass
class InternalError(DatabaseError):
pass
class ProgrammingError(DatabaseError):
pass
class NotSupportedError(DatabaseError):
pass
@cython.freelist(8)
cdef class Date:
cdef object adapted
def __init__(self, year, month, day):
self.adapted = datetime.date(year, month, day)
def __repr__(self):
return 'Date(%d,%d,%d)' % (self.year, self.month, self.day)
__str__ = __repr__
@property
def year(self):
return self.adapted.year
@property
def month(self):
return self.adapted.month
@property
def day(self):
return self.adapted.day
@cython.freelist(8)
cdef class Time:
cdef object adapted
def __init__(self, hour, minute, second):
self.adapted = datetime.time(hour, minute, second)
def __repr__(self):
return 'Time(%d,%d,%d)' % (self.hour, self.minute, self.second)
__str__ = __repr__
@property
def hour(self):
return self.adapted.hour
@property
def minute(self):
return self.adapted.minute
@property
def second(self):
return self.adapted.second
@cython.freelist(8)
cdef class Timestamp:
cdef object adapted
def __init__(self, year, month, day, hour, minute, second):
self.adapted = datetime.datetime(year, month, day, hour, minute, second)
def __repr__(self):
args = (self.year, self.month, self.day, self.hour, self.minute, self.second)
return 'Timestamp(%d,%d,%d)' % args
__str__ = __repr__
@property
def year(self):
return self.adapted.year
@property
def month(self):
return self.adapted.month
@property
def day(self):
return self.adapted.day
@property
def hour(self):
return self.adapted.hour
@property
def minute(self):
return self.adapted.minute
@property
def second(self):
return self.adapted.second
def DateFromTicks(ticks):
ts = datetime.fromtimestamp(ticks)
return Date(ts.year, ts.month, ts.day)
def TimeFromTicks(ticks):
ts = datetime.fromtimestamp(ticks)
return Time(ts.hours, ts.minutes, ts.seconds)
def TimestampFromTicks(ticks):
ts = datetime.fromtimestamp(ticks)
return Timestamp(ts.year, ts.month, ts.day, ts.hours, ts.minutes, ts.seconds)
def Binary(date):
raise NotSupportedError
class STRING:
pass
class BINARY:
pass
class NUMBER:
pass
class DATETIME:
pass
class ROWID:
pass
cdef class Connection:
cdef void *thisptr
cdef char *filename
def __dealloc__(self):
if self.thisptr != NULL:
duckdb_close(<duckdb_database *>&self.thisptr)
def __repr__(self):
if self.filename != NULL:
filename = self.filename.decode('utf-8')
else:
filename = "NULL"
return 'Connection(filename = "%s")' % filename
__str__ = __repr__
cpdef close(self):
pass
cpdef commit(self):
pass
cpdef rollback(self):
pass
cpdef Cursor cursor(self):
cdef Cursor ret = Cursor.__new__(Cursor)
ret.ref = self
ret.arraysize = 1
res = duckdb_connect(<duckdb_database>self.thisptr, <duckdb_connection *>&ret.thisptr)
if res == DuckDBError:
raise DatabaseError('failed to open cursor')
return ret
def execute(self, sql, *args):
cdef Cursor ret = self.cursor()
ret.execute(sql, *args)
return ret
cpdef Cursor executemany(self, sql, args):
cdef Cursor ret = self.cursor()
ret.executemany(sql, args)
return ret
cpdef Prepared prepare(self, sql):
cdef Cursor cur = self.cursor()
return cur.prepare(sql)
def connect(filename = ''):
cdef Connection ret = Connection.__new__(Connection)
if filename:
bytetext = unicode(filename).encode('utf-8')
ret.filename = bytetext
res = duckdb_open(ret.filename, <duckdb_database *>&ret.thisptr)
if res == DuckDBError:
raise DatabaseError('failed to open database')
return ret
cdef class Column
@cython.freelist(8)
cdef class Result:
cdef void *thisptr
def __dealloc__(self):
if self.thisptr != NULL:
duckdb_destroy_result(<duckdb_result *>self.thisptr)
free(self.thisptr)
def __repr__(self):
args = self.cols(), self.rows()
return 'Result(cols = %d, rows = %d)' % args
__str__ = __repr__
cpdef uint64_t rows(self):
return (<duckdb_result *>self.thisptr).row_count
cpdef uint64_t cols(self):
return (<duckdb_result *>self.thisptr).column_count
def __len__(self):
return self.rows()
def __getitem__(self, uint64_t row):
cdef uint64_t col
if row < 0 or row >= self.rows():
raise IndexError('index out of bounds')
return tuple(self._value(col,row) for col in range(self.cols()))
def _value(self, uint64_t col, uint64_t row):
cdef duckdb_column *column
cdef duckdb_type typ
cdef duckdb_date *date
cdef duckdb_time *time
cdef duckdb_timestamp *timestamp
cdef char *_value
if self.thisptr == NULL:
raise DatabaseError('no result exists')
column = &(<duckdb_result *>self.thisptr).columns[col]
if column.nullmask[row]:
return None
typ = column.type
if typ == DUCKDB_TYPE_INVALID:
raise DatabaseError('invalid results')
elif typ == DUCKDB_TYPE_BOOLEAN:
return (<cbool *>column.data)[row]
elif typ == DUCKDB_TYPE_TINYINT:
return (<int8_t *>column.data)[row]
elif typ == DUCKDB_TYPE_SMALLINT:
return (<int16_t *>column.data)[row]
elif typ == DUCKDB_TYPE_INTEGER:
return (<int32_t *>column.data)[row]
elif typ == DUCKDB_TYPE_BIGINT:
return (<int64_t *>column.data)[row]
elif typ == DUCKDB_TYPE_FLOAT:
return (<float *>column.data)[row]
elif typ == DUCKDB_TYPE_DOUBLE:
return (<double *>column.data)[row]
elif typ == DUCKDB_TYPE_TIMESTAMP:
timestamp = &(<duckdb_timestamp *>column.data)[row]
return datetime.datetime(
year = timestamp.date.year,
month = timestamp.date.month,
day = timestamp.date.day,
hour = timestamp.time.hour,
minute = timestamp.time.min,
second = timestamp.time.sec,
microsecond = timestamp.time.msec
)
elif typ == DUCKDB_TYPE_DATE:
date = &(<duckdb_date *>column.data)[row]
return datetime.date(
year = date.year,
month = date.month,
day = date.day
)
elif typ == DUCKDB_TYPE_TIME:
time = &(<duckdb_time *>column.data)[row]
return datetime.time(
hour = time.hour,
minute = time.min,
second = time.sec,
microsecond = time.msec
)
else:
_value = duckdb_value_varchar(<duckdb_result *>self.thisptr, col, row)
return _value.decode('utf-8')
cdef Column _column(self, uint64_t col):
cdef Column ret = Column.__new__(Column)
cdef duckdb_type typ
cdef duckdb_column *column
cdef PyObject *str_obj
cdef uint64_t row
ret.thisptr = &(<duckdb_result *>self.thisptr).columns[col]
ret.col = col
ret.ref = self
ret.__shape[0] = self.rows()
typ = (<duckdb_column *>ret.thisptr).type
if typ == DUCKDB_TYPE_BOOLEAN:
ret.__fmt = '?'
ret.__itemsize = sizeof(cbool)
elif typ == DUCKDB_TYPE_TINYINT:
ret.__fmt = 'b'
ret.__itemsize = sizeof(int8_t)
elif typ == DUCKDB_TYPE_SMALLINT:
ret.__fmt = 'h'
ret.__itemsize = sizeof(int16_t)
elif typ == DUCKDB_TYPE_INTEGER:
ret.__fmt = 'i'
ret.__itemsize = sizeof(int32_t)
elif typ == DUCKDB_TYPE_BIGINT:
ret.__fmt = 'l'
ret.__itemsize = sizeof(int64_t)
elif typ == DUCKDB_TYPE_FLOAT:
ret.__fmt = 'f'
ret.__itemsize = sizeof(float)
elif typ == DUCKDB_TYPE_DOUBLE:
ret.__fmt = 'd'
ret.__itemsize = sizeof(double)
elif typ == DUCKDB_TYPE_VARCHAR:
ret.__fmt = 'O'
ret.__itemsize = sizeof(void *)
ret.data = malloc(self.rows() * sizeof(PyObject *))
if ret.data == NULL:
raise InterfaceError('memory error')
null = None
column = <duckdb_column *>ret.thisptr
for row in range(self.rows()):
if column.nullmask[row]:
str_obj = <PyObject *>null
Py_INCREF(<object>str_obj)
else:
str_obj = PyUnicode_FromString((<char **>column.data)[row])
(<PyObject **>ret.data)[row] = str_obj
elif typ == DUCKDB_TYPE_TIMESTAMP:
ret.__fmt = 'ibbbbbh'
ret.__itemsize = sizeof(duckdb_timestamp)
elif typ == DUCKDB_TYPE_DATE:
ret.__fmt = 'ibb'
ret.__itemsize = sizeof(duckdb_date)
elif typ == DUCKDB_TYPE_TIME:
ret.__fmt = 'bbbh'
ret.__itemsize = sizeof(duckdb_time)
else:
raise InterfaceError('format not supported')
ret.__strides[0] = ret.__itemsize
return ret
@cython.freelist(8)
cdef class Column:
cdef void *thisptr
cdef void *data
cdef uint64_t col
cdef Result ref
# buffer interface
cdef object __fmt
cdef Py_ssize_t __itemsize
cdef Py_ssize_t __shape[1]
cdef Py_ssize_t __strides[1]
def __dealloc__(self):
cdef PyObject *str_obj
cdef uint64_t row
if self.data != NULL:
for row in range(self.ref.rows()):
str_obj = (<PyObject **>self.data)[row]
Py_DECREF(<object>str_obj)
free(self.data)
def __repr__(self):
args = self.__shape[0], self.__fmt
return 'Column(size = %d, fmt = "%s")' % args
__str__ = __repr__
@property
def dbtype(self):
cdef duckdb_column *column = <duckdb_column *>self.thisptr
return column.type
@property
def name(self):
cdef duckdb_column *column = <duckdb_column *>self.thisptr
return column.name.decode('utf-8')
def mask(self):
cdef duckdb_column *column = <duckdb_column *>self.thisptr
cdef view.array ret = view.array(
(self.__shape[0],),
itemsize=sizeof(cbool),
format='?',
allocate_buffer=False
)
ret.data = <char *>column.nullmask
return ret
def __getbuffer__(self, Py_buffer* buffer, int flags):
cdef char *fmt
bytestr = self.__fmt.encode('ascii')
fmt = bytestr
if self.data == NULL:
buffer.buf = (<duckdb_column *>self.thisptr).data
else:
buffer.buf = self.data
buffer.obj = self
buffer.len = self.__shape[0] * self.__strides[0]
buffer.readonly = 1
buffer.format = fmt
buffer.ndim = 1
buffer.shape = <Py_ssize_t *>&self.__shape[0]
buffer.strides = <Py_ssize_t *>&self.__strides[0]
buffer.suboffsets = NULL
buffer.itemsize = self.__itemsize
buffer.internal = NULL
def __releasebuffer__(self, Py_buffer* buffer):
pass
@cython.freelist(4)
cdef class Prepared:
cdef void *thisptr
cdef readonly object sql
def __dealloc__(self):
if self.thisptr != NULL:
duckdb_destroy_prepare(<duckdb_prepared_statement *>&self.thisptr)
def __repr__(self):
return 'Prepared("%s")' % self.sql[:80]
__str__ = __repr__
cpdef index_t cols(self):
cdef index_t ret
duckdb_nparams(<duckdb_prepared_statement>self.thisptr, &ret)
return ret
def execute(self, results = True):
cdef Result ret
if results:
ret = Result.__new__(Result)
ret.thisptr = malloc(sizeof(duckdb_result))
if ret.thisptr == NULL:
raise InterfaceError('memory error')
if duckdb_execute_prepared(<duckdb_prepared_statement>self.thisptr,
<duckdb_result *>ret.thisptr) \
== DuckDBError:
err = (<duckdb_result *>ret.thisptr).error_message
raise OperationalError(err.decode('utf-8'))
return ret
else:
if duckdb_execute_prepared(<duckdb_prepared_statement>self.thisptr,
NULL) == DuckDBError:
raise OperationalError("failed to execute sql")
def bind(self, int idx, object value):
cdef duckdb_prepared_statement pre = self.thisptr
cdef char *_value
if isinstance(value, (Date, Time, Timestamp)):
value = value.adapted
if value is None:
ret = duckdb_bind_null(pre, idx)
elif isinstance(value, bool):
ret = duckdb_bind_boolean(pre, idx, value)
elif isinstance(value, int):
ret = duckdb_bind_int64(pre, idx, value)
elif isinstance(value, float):
ret = duckdb_bind_double(pre, idx, value)
elif isinstance(value, datetime.datetime):
text = "TIMESTAMP '%s'" % value.isoformat(' ', timespec = 'seconds')
bytetext = text.encode('utf-8')
_value = bytetext
ret = duckdb_bind_varchar(pre, idx, _value)
elif isinstance(value, datetime.date):
text = "DATE '%s'" % value.isoformat()
bytetext = text.encode('utf-8')
_value = bytetext
ret = duckdb_bind_varchar(pre, idx, _value)
elif isinstance(value, datetime.time):
text = "TIME '%s'" % value.isoformat(timespec = 'seconds')
bytetext = text.encode('utf-8')
_value = bytetext
ret = duckdb_bind_varchar(pre, idx, _value)
else:
bytetext = unicode(value).encode('utf-8')
_value = bytetext
ret = duckdb_bind_varchar(pre, idx, _value)
if ret == DuckDBError:
raise DatabaseError('failed to bind value')
cpdef bindNull(self, uint64_t idx):
duckdb_bind_null(<duckdb_prepared_statement>self.thisptr, idx)
cpdef bindBoolean(self, uint64_t idx, bint value):
duckdb_bind_boolean(<duckdb_prepared_statement>self.thisptr, idx, value)
cpdef bindInteger(self, uint64_t idx, int64_t value):
duckdb_bind_int64(<duckdb_prepared_statement>self.thisptr, idx, value)
cpdef bindFloat(self, uint64_t idx, double value):
duckdb_bind_double(<duckdb_prepared_statement>self.thisptr, idx, value)
@cython.freelist(4)
cdef class Cursor:
cdef void *thisptr
cdef Result results
cdef uint64_t row
cdef Connection ref
cdef public uint64_t arraysize
def __dealloc__(self):
if self.thisptr != NULL:
duckdb_disconnect(<duckdb_connection *>&self.thisptr)
def __repr__(self):
if self.ref.filename != NULL:
filename = self.ref.filename.decode('utf-8')
else:
filename = "NULL"
return 'Cursor(filename = "%s")' % filename
__str__ = __repr__
@property
def rowcount(self):
if self.results is None:
return -1
return self.results.row()
@property
def description(self):
cdef duckdb_result *res
cdef duckdb_column *column
cdef duckdb_type typ
cdef uint64_t col
if self.results is None:
return None
res = <duckdb_result *>self.results.thisptr
ret = [None,] * self.results.rows()
for col in range(self.results.cols()):
column = &res.columns[col]
name = column.name.decode('utf-8')
typ = column.type
coltype = None
if typ in (DUCKDB_TYPE_BOOLEAN, DUCKDB_TYPE_TINYINT,
DUCKDB_TYPE_SMALLINT, DUCKDB_TYPE_INTEGER,
DUCKDB_TYPE_BIGINT, DUCKDB_TYPE_FLOAT,
DUCKDB_TYPE_DOUBLE):
coltype = NUMBER
elif typ in (DUCKDB_TYPE_TIMESTAMP, DUCKDB_TYPE_DATE,
DUCKDB_TYPE_TIME):
coltype = DATETIME
elif typ == DUCKDB_TYPE_VARCHAR:
coltype = STRING
ret[col] = (name, coltype, None, None, None, None, None)
return ret
def __iter__(self):
return self
def __next__(self):
cdef uint64_t row = self.row
if self.results is None:
raise DatabaseError('no result exists')
if row < self.results.rows():
self.row += 1
return self.results[row]
else:
raise StopIteration
next = __next__
def close(self):
pass
def callproc(self, *args, **kwargs):
raise NotSupportedError
def fetchone(self):
cdef uint64_t row = self.row
if self.results is None:
raise DatabaseError('no result exists')
if row < self.results.rows():
self.row += 1
return self.results[row]
else:
return None
def fetchmany(self, size = None):
cdef uint64_t i, _size, rows, row = self.row
if self.results is None:
raise DatabaseError('no result exists')
if size is None:
_size = max(self.arraysize, 1)
else:
_size = max(size, 1)
rows = self.results.rows()
if row < rows:
if row + _size > rows:
_size = rows - row
ret = [None,] * _size
for i in range(_size):
ret[i] = self.results[row + i]
self.row += _size
return ret
else:
return None
def fetchall(self):
if self.results is None:
raise DatabaseError('no result exists')
return self.results
def fetchColArray(self, uint64_t col):
if self.results is None:
raise DatabaseError('no result exists')
if col < 0 or col >= self.results.cols():
raise IndexError('index out of bounds')
import numpy.ma as ma
data = self.results._column(col)
return ma.array(data = data, mask = data.mask())
def fetchRecArray(self):
cdef uint64_t cols
if self.results is None:
raise DatabaseError('no result exists')
import numpy as np
cols = self.results.cols()
data = tuple(self.results._column(i) for i in range(cols))
return np.rec.fromarrays(data)
def fetchDataframe(self):
cdef uint64_t cols
if self.results is None:
raise DatabaseError('no result exists')
import numpy.ma as ma
import pandas as pd
cols = self.results.cols()
data = tuple(self.results._column(i) for i in range(cols))
arrs = (ma.array(data = col, mask = col.mask()) for col in data)
names = (col.name for col in data)
return pd.DataFrame.from_dict({name:arr for name,arr in zip(names,arrs)})
def execute(self, sql, *args):
cdef Prepared pre
cdef uint64_t i, cols
pre = self.prepare(sql)
cols = pre.cols()
i = -1
for i, arg in enumerate(args):
pre.bind(i + 1, arg)
if i + 1 != cols:
raise DatabaseError('not enough arguments supplied')
self.results = pre.execute()
self.row = 0
return self
def executemany(self, sql, args):
cdef Prepared pre
cdef uint64_t i, cols, rows
rows = len(args)
if not rows:
raise DatabaseError('expected arguments')
pre = self.prepare(sql)
cols = pre.cols()
# list, tuple or iterator
for row in args:
i = -1
for i, arg in enumerate(row):
pre.bind(i + 1, arg)
if i + 1 != cols:
raise DatabaseError('not enough arguments supplied')
pre.execute(results = False)
self.row = 0
return self
def setinputsizes(self, *args):
pass
def setoutputsize(self, *args):
pass
cpdef Prepared prepare(self, sql):
cdef Prepared ret = Prepared.__new__(Prepared)
cdef char *_sql
bytetext = unicode(sql).encode('utf-8')
_sql = bytetext
ret.sql = sql
if duckdb_prepare(<duckdb_connection>self.thisptr, _sql,
<duckdb_prepared_statement *>&ret.thisptr) == DuckDBError:
raise DatabaseError("failed to create prepared sql")
return ret
@tenko
Copy link
Author

tenko commented Feb 27, 2020

Pandas dataframe results + misc updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment