Skip to content

Instantly share code, notes, and snippets.

@AustinRochford
Created November 30, 2014 01:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AustinRochford/6be7cb4d9f38b9419f94 to your computer and use it in GitHub Desktop.
Save AustinRochford/6be7cb4d9f38b9419f94 to your computer and use it in GitHub Desktop.
Reservoir Sampling Blog Post
{
"metadata": {
"name": "",
"signature": "sha256:e9943648b5982fa868ab077d78fe550231c16f916d699ee244cf22a1a6120489"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"I have been interested in streaming data algorithms for some time. These algorithms assume that data arrive sequentially over time and/or that the data set is too large to fit into memory for random access. Perhaps the most widely-known streaming algorithm is [HyperLogLog](http://en.wikipedia.org/wiki/HyperLogLog), which calculates the approximate number of distinct values in a stream, with fixed memory use. In this post, I will discuss a simple algorithm for randomly sampling from from a data stream.\n",
"\n",
"Let the values of the stream be $x_1, x_2, x_3, \\ldots$; we do not need to assume that the stream ever terminates, although it may. The [reservoir algorithm](http://en.wikipedia.org/wiki/Reservoir_sampling) samples $k$ random items from the stream (without replacement) in the sense that after seeing $n$ data points, the probability that any individual data point is in the sample is $\\frac{k}{n}$. This algorithm only requires one pass through the stream, and uses storage proportionale to $k$ (not the total, possibly infinite, size of the stream).\n",
"\n",
"The reservoir algorithm is so named because at each step it updates a \"reservoir\" of candidate samples. We will denote the reservoir of candidate samples by $R$. We will use $R_t$ to denote the state of $R$ after observing the first $t$ data points. We think of $R$ as a vector of length $k$, so $R_t[0]$ is the first candidate sample after $t$ data points have been seen, $R_t[1]$ is the second, $R_t[k - 1]$ is the last, etc. It is important that $k$ is small enough that the reservoir vectors can be stored in memory (or at least accessed reasonably quickly on disk). Also, it is not necessary for the \n",
"\n",
"We initialize the first reservoir, $R_k$ with the first $k$ data points we see. At this point, we have a random sample (without replacement) of the first $k$ data points from the stream.\n",
"\n",
"Suppose now that we have seen $t - 1$ elements and have a reservoir of sample candidates $R_{t - 1}$. When we receive $x_t$, we generate an integer $i$ uniformly distributed in the interval $[1, t]$. If $i \\leq k$, we set $R_t[i - 1] = x_t$; otherwise, we wait for the next data point.\n",
"\n",
"Intuitively, this algorithm seems reasonable, because $P(x_t \\in R_t) = P(i \\leq k) = \\frac{k}{t}$, as we expect from a uniform random sample. What is less clear at this point, is that for any $s < t$ $P(x_{s} \\in R_t) = \\frac{k}{t}$ as well. We will now prove this fact.\n",
"\n",
"First, we calculate the probability that a candidate sample in the reservoir remains after another data point is received. We Let $x_s \\in R_t$, and suppose we have observed $x_{t + 1}$. The candidate sample $x_s$ will be in $R_{t + 1}$ if and only if the random integer $i$ generated for $x_{t + 1}$ is not the index of $x_s$ in $R_t$. Since $i$ is uniformly distributed in the interval $[1, t + 1]$, we have that\n",
"\n",
"$$P(x_s \\in R_{t + 1}\\ |\\ x_s \\in R_t) = \\frac{t}{t + 1}.$$\n",
"\n",
"Now, suppose that we have received $n$ data points. First consider the case where $k < s < n$. Then\n",
"\n",
"$$P(x_s \\in R_n) = P(x_s \\in R_n\\ |\\ x_s \\in R_s) \\cdot P(x_s \\in R_s).$$\n",
"\n",
"The second term, $P(x_s \\in R_s)$, is the probability that $x_s$ entered the reservoir when it was first observed, so\n",
"\n",
"$$P(x_s \\in R_s) = \\frac{k}{s}.$$\n",
"\n",
"To calculate the first term, $P(x_s \\in R_n\\ |\\ x_s \\in R_s)$, we multiply the probability that $x_s$ remains in the reservoir after each subsequent observation, yielding\n",
"\n",
"$$\n",
"P(x_s \\in R_n\\ |\\ x_s \\in R_s)\n",
" = \\prod_{t = s}^{n - 1} P(x_s \\in R_{t + 1}\\ |\\ x_s \\in R_t)\n",
" = \\frac{s}{s + 1} \\cdot \\frac{s + 1}{s + 2} \\cdot \\cdots \\cdot \\frac{n - 1}{n}\n",
" = \\frac{s}{n}.\n",
"$$\n",
"\n",
"Therefore\n",
"\n",
"$$P(x_s \\in R_n) = \\frac{s}{n} \\cdot \\frac{k}{s} = \\frac{k}{n},$$\n",
"\n",
"as desired.\n",
"\n",
"Now consider the case where $s \\leq k$, so that $P(x_s \\in R_k) = 1$. In this case,\n",
"\n",
"$$\n",
"P(x_s \\in R_n)\n",
" = P(x_s \\in R_n\\ |\\ x_s \\in R_k)\n",
" = \\prod_{t = k}^{n - 1} P(x_s \\in R_{t + 1}\\ |\\ x_s \\in R_t)\n",
" = \\frac{k}{k + 1} \\cdot \\frac{k + 1}{k + 2} \\cdot \\cdots \\cdot \\frac{n - 1}{n}\n",
" = \\frac{k}{n},\n",
"$$\n",
"\n",
"as desired."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Below we give an implementation of this algorithm in Python."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import itertools as itl\n",
"\n",
"import numpy as np"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"k = 10"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def sample_after(stream, k):\n",
" \"\"\"\n",
" Return a random sample ok k elements drawn without replacement from stream.\n",
" \n",
" This function is designed to be used when the elements of stream cannot\n",
" fit into memory.\n",
" \"\"\"\n",
" r = np.array(list(itl.islice(stream, k)))\n",
" \n",
" for t, x in enumerate(stream, k + 1):\n",
" i = np.random.randint(1, t + 1)\n",
"\n",
" if i <= k:\n",
" r[i - 1] = x\n",
" \n",
" return r"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 3
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sample_after(xrange(1000000000), 10)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 4,
"text": [
"array([950266975, 378108182, 637777154, 915372867, 298742970, 629846773,\n",
" 749074581, 893637541, 328486342, 685539979])"
]
}
],
"prompt_number": 4
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generalizations\n",
"\n",
"Vitter^[Vitter, Jeffrey S. \"Random sampling with a reservoir.\" _ACM Transactions on Mathematical Software (TOMS)_ 11.1 (1985): 37-57.] gives three generalizations of this simple reservoir sampling algorithm, all based on the following idea.\n",
"\n",
"Instead of generating a random integer for each data point, we generate the number of data points to skip before the next candidate data point. Let $S_t$ be the number of data points to advance from $x_t$ before adding a candidate to the reservoir. For example, if $S_t = 3$, we would ignore $x_{t + 1}$ and $x_{t + 2}$ and add $x_{t + 3}$ to the reservoir.\n",
"\n",
"The simple reservoir algorithm leads to the following distribution on $S_t$:\n",
"\n",
"$$P(S_t = 1) = \\frac{k}{t + 1}$$\n",
"\n",
"$$P(S_t = 2) = \\left(1 - \\frac{k}{t + 1}\\right) \\frac{k}{t + 2} = \\frac{t - k + 1}{t + 1} \\cdot \\frac{k}{t + 2}$$\n",
"\n",
"$$P(S_t = 3) = \\left(1 - \\frac{k}{t + 2}\\right) \\left(1 - \\frac{k}{t + 1}\\right) \\frac{k}{t + 3} = \\frac{t - k + 2}{t + 2} \\cdot \\frac{t - k + 1}{t + 1} \\cdot \\frac{k}{t + 3}$$\n",
"\n",
"In general,\n",
"\n",
"$$P(S_t = s) = k \\cdot \\frac{t! (t + s - (k + 1))!}{(t + s)! (t - k)!}.$$\n",
"\n",
"Vitter gives three generalizations of the reservor algorithm, each based on different ways of sampling from the distribution of $S_t$. These generalizations have the advantage of requiring the generation of fewer random integers than the simple reservoir algorithm given above."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 4
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment