Skip to content

Instantly share code, notes, and snippets.

@freeman-lab
Created May 24, 2015 21:30
Show Gist options
  • Save freeman-lab/75e5d24aec2d3e853150 to your computer and use it in GitHub Desktop.
Save freeman-lab/75e5d24aec2d3e853150 to your computer and use it in GitHub Desktop.
Multi-channel time series parallelization in Thunder
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"*Note*: This notebook was run using Spark 1.3.0 and the current master branch of Thunder (0.6.0.dev)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Construct a distributed Series object "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll pretend the starting point is a channels x time numpy array (could easily be some on-disk representation)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from numpy import random\n",
"channels = random.randn(10,100)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The ThunderContext `tsc` is the entry-point for object creation"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"data = tsc.loadSeriesFromArray(channels, index=range(0, 100))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Series\n",
"nrecords: None (inspect to compute)\n",
"dtype: float64\n",
"dims: None (inspect to compute)\n",
"index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, ... ] (length: 100)"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Each record is a key-value pair, where the key (in this case) is the channel index. We can first look at the keys"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data.keys().collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Or look at the first of the values (it's a numpy array)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(100,)"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data.values().first().shape"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Group by fixed length"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If we want to split into chunks of a fixed size, say 10, we can use `groupByFixedLength`"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"grouped = data.groupByFixedLength(5)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Series\n",
"nrecords: 200\n",
"dtype: float64\n",
"dims: None (inspect to compute)\n",
"index: [0 1 2 3 4]"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"grouped"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This yields a new distributed object with 20x as many records as before"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"10"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data.nrecords"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"200"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"grouped.nrecords"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And each record is only 5 long"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(5,)"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"grouped.values().first().shape"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The new keys are tuples where the first entry is the original channel id, and the second entry is the new chunk id"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[(0, 0), (0, 1), (0, 2), (0, 3), (0, 4)]"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"grouped.keys().take(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Group by window"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If we want to split into potentially overlapping chunks specified by a set of center indices, we can use `groupByWindow`. This method is specified to time series, so first we have to convert to a `TimeSeries`, which is a sub-class of `Series` with added functionality."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ts = data.toTimeSeries()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"TimeSeries\n",
"nrecords: 10\n",
"dtype: float64\n",
"dims: None (inspect to compute)\n",
"index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, ... ] (length: 100)"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ts"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Pick 8 windows of size 5"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"groupedts = ts.groupByWindow(indices=[10, 20, 30, 40, 50, 60, 70, 80], window=5)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"TimeSeries\n",
"nrecords: 80\n",
"dtype: float64\n",
"dims: None (inspect to compute)\n",
"index: [0 1 2 3 4]"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"groupedts"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"80"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"groupedts.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Each record is only 5 long"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(5,)"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"groupedts.values().first().shape"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The new keys are tuples where the first entry is the original channel id, and the second entry is the new chunk id"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[(0, 0), (0, 1), (0, 2), (0, 3), (0, 4)]"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"groupedts.keys().take(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Next steps..."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can now use one of these distributed objects for fast parallelized analyses, for example, KMeans"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from thunder import KMeans"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"model = KMeans(k=2).fit(grouped)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"model = KMeans(k=2).fit(groupedts)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also do simple parallelized subselection or filtering in time before running the KMeans:"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"model = KMeans(k=2).fit(groupedts.between(0, 2))"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"model = KMeans(k=2).fit(groupedts.zscore())"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"model = KMeans(k=2).fit(groupedts.detrend())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As the data was random, we won't bother inspecting the results here"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment