Skip to content

Instantly share code, notes, and snippets.

@guillemborrell
Last active October 23, 2021 04:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save guillemborrell/2a11a258fbd9cfd8d8e0bc14eac83fa7 to your computer and use it in GitHub Desktop.
Save guillemborrell/2a11a258fbd9cfd8d8e0bc14eac83fa7 to your computer and use it in GitHub Desktop.
Percentiles of rolling windows in time series with Cython
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%load_ext Cython"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"arr = np.random.random(100000)\n",
"\n",
"s = pd.Series(arr)\n",
"sumw_pandas = s.rolling(10).apply(np.sum)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%cython --compile-args=-fopenmp --link-args=-fopenmp --force\n",
"\n",
"import cython\n",
"import numpy as np\n",
"cimport numpy as np\n",
"from libc.stdlib cimport malloc\n",
"from cython.parallel import prange, parallel\n",
"\n",
"@cython.boundscheck(False)\n",
"def window_sum(np.ndarray[double, ndim=1] arr, int window):\n",
" cdef h = np.zeros_like(arr)\n",
" cdef int imax = arr.shape[0]\n",
" cdef double *buffer = <double *>malloc(imax * sizeof(double))\n",
" cdef double result = 0.0\n",
" cdef int i, j\n",
" with nogil, parallel():\n",
" for i in prange(imax, schedule='dynamic'):\n",
" buffer[i] = 0.0\n",
" if i >= window-1:\n",
" for j in range(window):\n",
" buffer[i] += arr[i-j]\n",
" \n",
" for i in range(imax):\n",
" if i < window -1:\n",
" h[i] = np.nan\n",
" else:\n",
" h[i] = buffer[i]\n",
" \n",
" return h "
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"sumw_cython = window_sum(arr,10)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"3.5527136788005009e-15"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"np.max(np.abs(sumw_cython - sumw_pandas))"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%cython --compile-args=-fopenmp --link-args=-fopenmp --force\n",
"\n",
"import cython\n",
"import numpy as np\n",
"cimport numpy as np\n",
"from libc.stdlib cimport malloc, free, qsort\n",
"from cython.parallel import prange, parallel\n",
"\n",
"# Comparation function needed for sorting\n",
"cdef int mycmp(const void * pa, const void * pb) nogil:\n",
" cdef double a = (<double *>pa)[0]\n",
" cdef double b = (<double *>pb)[0]\n",
" if a < b:\n",
" return -1\n",
" elif a > b:\n",
" return 1\n",
" else:\n",
" return 0\n",
"\n",
"# Compute the percentile picking the lower interval. No interpolation.\n",
"cdef double atomic_percentile(double *buf, int window, double perc) nogil:\n",
" # new buffer\n",
" cdef double *newbuf = <double *>malloc(window * sizeof(double))\n",
" cdef int i\n",
" cdef int index\n",
" cdef double result\n",
" \n",
" for i in range(window):\n",
" newbuf[i] = buf[i]\n",
" \n",
" # Sort the buffer\n",
" qsort(newbuf, window, sizeof(double), mycmp)\n",
" \n",
" # cut of the percentile\n",
" index = int((<double>window) * perc / 100.0)\n",
" result = newbuf[index-1]\n",
"\n",
" # Deallocate the auxiliary buffer\n",
" free(newbuf)\n",
"\n",
" return result\n",
"\n",
"@cython.boundscheck(False)\n",
"def window_percentile(np.ndarray[double, ndim=1] arr, int window, double perc):\n",
" # Create numpy array and set auxiliary array. This requires the GIL.\n",
" cdef h = np.zeros_like(arr)\n",
" cdef int imax = arr.shape[0]\n",
" cdef double *buffer = <double *>malloc(imax * sizeof(double))\n",
" cdef double result = 0.0\n",
" cdef int i, j\n",
" \n",
" with nogil, parallel():\n",
" for i in prange(imax, schedule='dynamic'):\n",
" buffer[i] = 0.0\n",
" if i >= window - 1:\n",
" buffer[i] = atomic_percentile(&arr[i-(window-1)], window, perc)\n",
"\n",
" # Set the beginning of the windowed series as NaN. Corresponds to zeros\n",
" # in buffer\n",
" for i in range(imax):\n",
" if i < window - 1:\n",
" h[i] = np.nan\n",
" else:\n",
" h[i] = buffer[i]\n",
"\n",
" free(buffer)\n",
" return h\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"percentile10 = pd.Series(window_percentile(arr,10, 10))\n",
"percentile10_p = s.rolling(10).apply(np.percentile, args=(10, None, None, False,'lower'))"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0.0"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"np.max(np.abs(percentile10 - percentile10_p))"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"100 loops, best of 3: 18.2 ms per loop\n"
]
}
],
"source": [
"%timeit window_percentile(arr,10, 10)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 loop, best of 3: 2.96 s per loop\n"
]
}
],
"source": [
"%timeit s.rolling(10).apply(np.percentile, args=(10, None, None, False,'lower'))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@kikocorreoso
Copy link

kikocorreoso commented Mar 3, 2017

import numpy as np
from numpy.lib.stride_tricks import as_strided
import pandas as pd

arr = np.random.random(100000)
s = pd.Series(arr)

# ad-hoc rolling window
def rolling_window(a, window):
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return as_strided(a, shape=shape, strides=strides)

# Result with pure numpy
res_np = np.percentile(
    rolling_window(arr, 10),
    10,
    1, # axis
    None,
    False,
    'lower'
)

# Result with pandas
tmp = s.rolling(10).apply(np.percentile, args=(10, None, None, False,'lower'))
res_pd = tmp[tmp.notnull()]

# check if all values are similar
print(all(res_np == res_pd))

# Performance (pandas / numpy)
t_np = %timeit -o np.percentile(rolling_window(arr, 10), 10, 1, None, False, 'lower')
t_pd = %timeit -o s.rolling(10).apply(np.percentile, args=(10, None, None, False,'lower'))
print(t_pd.best / t_np.best)

Results:

True
100 loops, best of 3: 13.9 ms per loop
1 loop, best of 3: 5.06 s per loop
364.2848075594796

Reference: http://stackoverflow.com/a/6811241

@guillemborrell
Copy link
Author

guillemborrell commented Mar 6, 2017

@kikocorreoso

Que conste que en mi máquina la versión de numpy tarda 24.9 ms.

Con pequeñas modificaciones a la función window_percentile:

@cython.boundscheck(False)
def window_percentile(np.ndarray[double, ndim=1] arr, int window, double perc):
    # Create numpy array and set auxiliary array. This requires the GIL.
    cdef int imax = arr.shape[0]
    cdef np.double_t[:] buffer = <np.double_t [:imax]>malloc(imax * sizeof(double))
    cdef double NAN = math.NAN
    cdef int i, j, stride
    
    with nogil, parallel():
        for i in prange(imax, schedule='dynamic'):
            stride = i-(window-1)
            if i >= window - 1:
                buffer[i] = atomic_percentile(&arr[stride], window, perc)
            else:
                buffer[i] = NAN

    return np.asarray(buffer)

Lo bajo a 13.8 ms. Parte de la gracia de Cython es levantar el GIL y poder correr con varios cores.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment