Skip to content

Instantly share code, notes, and snippets.

@matthieubulte
Last active June 1, 2019 18:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthieubulte/cc9eaf8af11022c8e688dab8b2e8d439 to your computer and use it in GitHub Desktop.
Save matthieubulte/cc9eaf8af11022c8e688dab8b2e8d439 to your computer and use it in GitHub Desktop.
RandomRDD with custom PRNG.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# RandomRDD with custom PRNG\n",
"\n",
"If for whatever reason you are not happy with the prseudo-random number generators provided by Spark's RandomRDD, here's a way to create random RDDs using your own custom pseudo-ramdom number generator!"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkContext\n",
"import random\n",
"import numpy as np\n",
"\n",
"sc = SparkContext(\"local[20]\", \"RandomRDD with custom PRNG\")"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Define our function for creating such RDDs. The first parameter is a pseudo-random number \n",
"# generator which takes the current seed as a parrameter and return the new value and seed.\n",
"def random_rdd(prng, n_obs, n_partitions):\n",
" n_obs_per_partition = int(n_obs / n_partitions)\n",
"\n",
" def seed_to_random_partition(seed):\n",
" for _ in range(n_obs_per_partition):\n",
" seed, x = prng(seed)\n",
" yield x\n",
"\n",
" seeds = [ np.uint64(random.getrandbits(64)) for _ in range(n_partitions) ]\n",
" \n",
" return sc.parallelize(seeds, n_partitions)\\\n",
" .flatMap(seed_to_random_partition, preservesPartitioning=True)\n",
" \n",
" \n",
"# We use a modified xorshift algorithm to sample from U[0, 1]\n",
"def nextU01(x):\n",
" # use np.uint64 to make sure that out 64bits int don't automatically get converted\n",
" # to python longs. python longs cannot overflow and thus this shifting will just generate\n",
" # bigger and bigger longs.\n",
" x ^= np.left_shift(x, np.uint64(13))\n",
" x ^= np.right_shift(x, np.uint64(17))\n",
" x ^= np.left_shift(x, np.uint64(5))\n",
"\n",
" return x, float(x / float(18446744073709551615))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Expected results: \n",
"count = 10000000\n",
"min = 0\n",
"max = 1\n",
"mean = 0.5\n",
"var = 0.833...\n",
"\n",
"Computed results: \n",
"count = 10000000\n",
"min = 0.000000\n",
"max = 1.000000\n",
"mean = 0.500113\n",
"var = 0.083375\n"
]
}
],
"source": [
"# We want to sample 10M observations from U[0, 1]\n",
"n_obs = 10000000\n",
"n_partitions = 10\n",
"\n",
"# Generate the RDD and compute relevant statistics on it\n",
"random_data = random_rdd(nextU01, n_obs, n_partitions)\n",
"stats = random_data.stats()\n",
"\n",
"# Knowing that we sampled from U[0, 1], we can verify that\n",
"# we properly sampled by comparing theoretical statistics to\n",
"# those computed by Spark and our method\n",
"print(\"Expected results: \")\n",
"print(\"count = %d\" % n_obs)\n",
"print(\"min = 0\")\n",
"print(\"max = 1\")\n",
"print(\"mean = 0.5\")\n",
"print(\"var = 0.833...\")\n",
"\n",
"print(\"\\nComputed results: \")\n",
"print(\"count = %d\" % stats.count())\n",
"print(\"min = %f\" % stats.min())\n",
"print(\"max = %f\" % stats.max())\n",
"print(\"mean = %f\" % stats.mean())\n",
"print(\"var = %f\" % stats.stdev()**2)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.15"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment