Skip to content

Instantly share code, notes, and snippets.

@bpartridge
Created November 18, 2013 23:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bpartridge/7537202 to your computer and use it in GitHub Desktop.
Save bpartridge/7537202 to your computer and use it in GitHub Desktop.
{
"metadata": {
"name": ""
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"![](http://hips.seas.harvard.edu/sites/all/themes/hips/hips-header-logos.png)\n",
"![](http://spark.incubator.apache.org/images/spark-project-header1-cropped.png)\n",
"\n",
"Brenton Partridge\n",
"\n",
"November 12, 2013"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Intro: Traditional MapReduce\n",
"\n",
"- Each node loads a small partition of the data from disk (or a distributed filesystem).\n",
"- **Map** each partition to a value, or a tuple of values.\n",
"- **Reduce** each pair of map outputs to something of the same form as a map output. Keep doing this in a tree-like style until you have one output.\n",
"- Save results to disk.\n",
"\n",
"Frameworks like Hadoop are designed to do this whole process very effectively."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# MapReduce for Machine Learning\n",
"\n",
"- Each node loads a small partition of the data from disk (or a distributed filesystem).\n",
"- **Map** each partition to a value.\n",
"- **Reduce** pairs of map outputs.\n",
"- **Distribute** some updated global parameter for the next map step.\n",
"- **Map** each partition to a value.\n",
"- **Reduce** pairs of map outputs.\n",
"- **Distribute** some updated global parameter for the next map step.\n",
"- **Map** each partition to a value.\n",
"- **Reduce** pairs of map outputs.\n",
"- **Distribute** some updated global parameter for the next map step.\n",
"- ...\n",
"- Save results to disk."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# ACTUAL MapReduce for Machine Learning\n",
"\n",
"- Each node loads a small partition of the data from disk (or a distributed filesystem).\n",
"- **Map** each partition to a value.\n",
"- **Reduce** pairs of map outputs.\n",
"- **Distribute** some updated global parameter for the next map step.\n",
"- **Map** each partition to a value.\n",
"- **Reduce** pairs of map outputs.\n",
"- **Distribute** some updated global parameter for the next map step.\n",
"- **Map** each partition to a value.\n",
"- **Reduce** pairs of map outputs.\n",
"- **Distribute** some updated global parameter for the next map step.\n",
"- ...\n",
"- ___There's a bug! Start again... but hopefully we don't have to wait for the data to load again!___\n",
"- **Map** each partition to a value.\n",
"- **Reduce** pairs of map outputs.\n",
"- **Distribute** some updated global parameter for the next map step.\n",
"- ...\n",
"- Save results to disk.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Enter Spark.\n",
"\n",
"From [http://spark.incubator.apache.org/](http://spark.incubator.apache.org/):\n",
"\n",
"<h2 id=\"what-is-apache-spark\">What is Apache Spark?</h2>\n",
"\n",
"<p>Apache Spark is an open source cluster computing system that aims to make data analytics <em>fast</em> \u2014 both fast to run and fast to write.</p>\n",
"\n",
"<p>To run programs faster, Spark offers a general execution model that can optimize arbitrary operator graphs, and supports in-memory computing, which lets it query data faster than disk-based engines like Hadoop.</p>\n",
"\n",
"<p>To make programming faster, Spark provides clean, concise APIs in\n",
"<a href=\"http://www.scala-lang.org\" onclick=\"javascript:_gaq.push(['_trackEvent','outbound-article','http://www.scala-lang.org']);\">Scala</a>,\n",
"<a href=\"http://spark.incubator.apache.org/docs/latest/quick-start.html#a-standalone-app-in-java\">Java</a> and\n",
"<a href=\"http://spark.incubator.apache.org/docs/latest/quick-start.html#a-standalone-app-in-python\">Python</a>.\n",
"You can also use Spark interactively from the Scala and Python shells to rapidly query big datasets.</p>\n",
"\n",
"<h2 id=\"what-can-it-do\">What can it do?</h2>\n",
"\n",
"<p>Spark was initially developed for two applications where placing data in memory helps: <em>iterative</em> algorithms, which are common in machine learning, and <em>interactive</em> data mining. In both cases, Spark can run up to <b>100x</b> faster than Hadoop MapReduce. However, you can use Spark for general data processing too. Check out our <a href=\"http://spark.incubator.apache.org/examples.html\">example jobs</a>.</p>\n",
"\n",
"<p>Spark is also the engine behind <a href=\"http://shark.cs.berkeley.edu\" onclick=\"javascript:_gaq.push(['_trackEvent','outbound-article','http://shark.cs.berkeley.edu']);\">Shark</a>, a fully <a href=\"http://hive.apache.org\" onclick=\"javascript:_gaq.push(['_trackEvent','outbound-article','http://hive.apache.org']);\">Apache Hive</a>-compatible data warehousing system that can run 100x faster than Hive.</p>\n",
"\n",
"\n",
"<p>While Spark is a new engine, it can access any data source supported by Hadoop, making it easy to run over existing data.</p>\n",
"\n",
"<h2 id=\"who-uses-it\">Who uses it?</h2>\n",
"<p>Spark was initially created in the <a href=\"https://amplab.cs.berkeley.edu\" onclick=\"javascript:_gaq.push(['_trackEvent','outbound-article','http://amplab.cs.berkeley.edu']);\">UC Berkeley AMPLab</a>, but is now being used and developed at a wide array of companies.\n",
"See our <a href=\"https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark\">powered by page</a> for a list of users,\n",
"and our <a href=\"https://cwiki.apache.org/confluence/display/SPARK/Committers\">list of committers</a>.\n",
"In total, over 25 companies have contributed code to Spark.\n",
"Spark is <a href=\"https://github.com/apache/incubator-spark\" onclick=\"javascript:_gaq.push(['_trackEvent','outbound-article','http://github.com']);\">open source</a> under an Apache license, so <a href=\"http://spark.incubator.apache.org/downloads.html\">download</a> it to try it out.</p>"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Key Features of Spark\n",
"\n",
"- **Multiple Languages**: Scala, Java, Python (seamlessly via [Py4j](http://py4j.sourceforge.net/))\n",
"\n",
"- **Master-Worker Architecture**\n",
" - On a cluster without a scheduler, Spark can be its own scheduler.\n",
" - On a cluster with a scheduler,\n",
" the master can exist on a login node, while workers can be submitted on \n",
" `qsub` (but be sure to kill them when done)\n",
"\n",
"- **Resilient Distributed Datasets** (RDDs) distribute their contents across\n",
" nodes, and can be given redundancy in case a node or worker process\n",
" goes down.\n",
" - RDDs can be **cached** across operations, and the least-recently-used \n",
" are automatically evicted when out of memory.\n",
" - With certain arguments to the Spark context, you can set variables to be\n",
" cached on a shared filesystem, and their partitions can be redistributed\n",
" if a node goes down.\n",
" - RDDs can be the result of operations on other RDDs.\n",
"\n",
"- **\"Automagical\" distribution of constants and functions across cluster**\n",
" - Pickles or serializes any constants or functions used in a MapReduce\n",
" calculation.\n",
" - See [cloudpickle.py](https://github.com/apache/incubator-spark/blob/master/python/pyspark/cloudpickle.py) for how it works -\n",
" and this code could be useful to use in MPI."
]
},
{
"cell_type": "heading",
"level": 1,
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"Basic Usage"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Just standard import code for the iPython notebook\n",
"\n",
"import numpy as np\n",
"from matplotlib import pyplot as plt\n",
"%matplotlib inline"
],
"language": "python",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Connection code to the Spark master node.\n",
"\n",
"# You can't load multiple SparkContexts in a single Python application.\n",
"# So if you want to change the configuration, you need to restart the\n",
"# notebook kernel. The final line just ensures that it's working. \n",
"# If you're on Neo, you should be seeing 32+ cores when you run this\n",
"# (you may need to run it multiple times to see that).\n",
"\n",
"import pyspark\n",
"from pyspark import SparkContext, StorageLevel\n",
"try:\n",
" sc\n",
"except:\n",
" sc = SparkContext(\"spark://neologin.int.seas.harvard.edu:7077\", \"ParallelLDA\")\n",
" # sc = SparkContext(\"local[3]\", \"ParallelLDA\") # to run without a cluster\n",
"\n",
"sc.defaultParallelism"
],
"language": "python",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 6,
"text": [
"3"
]
}
],
"prompt_number": 6
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# A basic example of the map-reduce syntax.\n",
"# These few lines are all that is needed to distribute the operations\n",
"# to an entire cluster!\n",
"# Note that the mapper will print to logs visible from the\n",
"# web interface, not to the executing process.\n",
"\n",
"def mapper(i):\n",
" print i\n",
" return i+1\n",
"\n",
"def reducer(a,b):\n",
" return a+b\n",
"\n",
"sc.parallelize(np.arange(100000)) \\\n",
" .map(mapper) \\\n",
" .reduce(reducer)"
],
"language": "python",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 5,
"text": [
"5000050000"
]
}
],
"prompt_number": 5
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Dataset for this example:\n",
"\n",
"![](http://labrosa.ee.columbia.edu/millionsong/sites/default/files/musixmatch_logo.png)\n",
"\n",
"musiXmatch dataset, the official lyrics collection for the Million Song Dataset, \n",
"available at: http://labrosa.ee.columbia.edu/millionsong/musixmatch"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"text = sc.textFile(\"mxm_dataset_train_1000.txt\").cache(); text"
],
"language": "python",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 40,
"text": [
"<pyspark.rdd.RDD at 0x3dae850>"
]
}
],
"prompt_number": 40
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"text_first = text.first(); text_first"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 41,
"text": [
"u'TRAAAAV128F421A322,4623710,1:6,2:4,3:2,4:2,5:5,6:3,7:1,8:1,11:1,12:2,13:3,14:1,15:1,18:2,19:2,20:2,21:2,23:4,25:1,26:2,28:1,30:1,36:2,42:1,45:1,54:2,56:1,57:1,68:1,99:1,192:2,249:1,264:1,356:1,389:1,561:1,639:1,656:1,687:1,761:1,773:1,804:1,869:2,914:1,1035:1,1156:1,1221:1,1287:1,1364:1,1407:1,1533:2,1857:1,2096:1,2117:1,2482:2,2548:1,2705:1,2723:1,2868:2,2992:2,3455:1,3717:1,3851:1,4322:1,4382:1,4613:1,4713:1,4906:1'"
]
}
],
"prompt_number": 41
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Latent Dirichlet Allocation: Topic Modeling\n",
"\n",
"![](http://upload.wikimedia.org/wikipedia/commons/d/d3/Latent_Dirichlet_allocation.svg)\n",
"\n",
"We will show how to fit a topic model to the lyrics dataset, under the generative model that each document has a distribution over topics, and each topic has a distribution over words.\n",
"\n",
"Refer to [Blei, Ng, Jordan 2003](http://www.seas.harvard.edu/courses/cs281/papers/blei-ng-jordan-2003.pdf) for the generative model, and [Hoffman, Blei, Bach 2010](http://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf) for the specific algorithm implemented below."
]
},
{
"cell_type": "heading",
"level": 1,
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"Variational Inference for LDA"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"K = 20 # topics\n",
"W = 100 # words\n",
"D = text.count()\n",
"K,W,D"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 42,
"text": [
"(20, 100, 1000)"
]
}
],
"prompt_number": 42
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"Here we define functions that operate on individual documents,\n",
"beginning with one to load data.\n",
"Just like how you sometimes need to pass a heterogeneous collection\n",
"of variables into a scipy optimizer, here you need to pass in everything\n",
"you need, every time, if it has a possibility of changing in an iteration.\n",
"\n",
"This first function preprocesses each line (lyrics for a single song)\n",
"into the matrices needed for the algorithm."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def doc_init(line, params):\n",
" \"Returns a doc_state tuple representing the document-specific priors.\"\n",
" AlphaK, LambdaKW = params\n",
" \n",
" split = line.split(',')\n",
" docid = split[0]\n",
" pairs = split[2:]\n",
" pairs = [tuple(pair.split(':')) for pair in pairs]\n",
" pairs = [(int(w)-1,int(c)) for w,c in pairs if int(w)-1 < W]\n",
" # Note that we subtract 1 from word indices for this dataset\n",
" \n",
" WordN = np.array([w for w,c in pairs])\n",
" CountN = np.array([c for w,c in pairs])\n",
" CountW = np.zeros(W, dtype=np.int)\n",
" N = WordN.size\n",
" for n in range(N):\n",
" CountW[WordN[n]] += CountN[n]\n",
" \n",
" PhiWK = np.ones((W,K)) / K\n",
" GammaK = AlphaK + N/K\n",
" debug = dict()\n",
" return (docid, N, WordN, CountN, CountW, PhiWK, GammaK, debug)\n",
"\n",
"first_doc_state = doc_init(text_first, (np.ones(K), np.ones((K,W))))\n",
"print first_doc_state[4][:5], '...'\n",
"\n",
"# Here we test using Spark itself, doing a simple map-reduce\n",
"# to show that the word counts are being loaded correctly.\n",
"\n",
"print text.map(lambda l: doc_init(l, (np.ones(K), np.zeros((K,W))))[4]) \\\n",
" .reduce(lambda N1, N2: N1 + N2)[:5], '...'"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"[6 4 2 2 5] ...\n",
"[8798 7806 7559 4606 4431]"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
" ...\n"
]
}
],
"prompt_number": 43
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"### E-step and M-step for Variational LDA"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In batch variational LDA, we alternate between a data-parallel **E-step**,\n",
"which updates document/song-specific variational parameters, and\n",
"a global **M-step**, which \"reduces\" statistics from all of those parameters\n",
"into an update to a global variational parameter used in the next E-step."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from scipy.special import psi\n",
"from numpy import exp, sum, abs\n",
"from numpy.linalg import norm\n",
"\n",
"# Batch Variational Bayes algorithm from Hoffman, Blei, Bach 2010\n",
"def doc_estep(doc_state, params):\n",
" \"Document-specific E-step. Returns a doc_state tuple.\"\n",
" docid, N, WordN, CountN, CountW, PhiWK, GammaK, debug = doc_state\n",
" AlphaK, LambdaKW = params\n",
" # TODO: do we need to clone the arrays or can we modify in place?\n",
" \n",
" PhiWK = np.copy(PhiWK)\n",
" GammaK = np.ones(K)\n",
" OldGammaK = np.ones(K)\n",
" \n",
" estep_iters = N*2\n",
" \n",
" psi_sum_LambdaKW = sum(LambdaKW, axis=1)\n",
" assert psi_sum_LambdaKW.size == K\n",
" Eq_logBetaKW = psi(LambdaKW) - psi_sum_LambdaKW[:,np.newaxis]\n",
" \n",
" gamma_diffs = np.zeros(estep_iters)\n",
" \n",
" for it in xrange(estep_iters):\n",
" Eq_logThetaK = psi(GammaK) - psi(sum(GammaK))\n",
" PhiWK[:] = Eq_logThetaK[np.newaxis,:] + Eq_logBetaKW.T\n",
" \n",
" # Normalize over K on each row w\n",
" PhiW_row_sum = sum(PhiWK, axis=1)\n",
" PhiWK /= PhiW_row_sum[:,np.newaxis]\n",
" \n",
" # Update gamma from the document contents\n",
" # Note: this could also be matrix mult PhiWK * diag(CountW)\n",
" # From the right such a matrix rescales the columns.\n",
" GammaK[:] = AlphaK + sum(PhiWK * CountW[:,np.newaxis], axis=0)\n",
" \n",
" gamma_diffs[it] = gamma_diff = norm(GammaK - OldGammaK, 1)/K\n",
" \n",
" if gamma_diff < 0.00001:\n",
" break\n",
" OldGammaK[:] = GammaK\n",
" \n",
" debug = dict(it=it, gamma_diffs=gamma_diffs)\n",
" return (docid, N, WordN, CountN, CountW, PhiWK, GammaK, debug)\n",
"\n",
"def doc_to_local_lambda(doc_state):\n",
" \"\"\"Returns the local portion of LambdaKW for this document.\n",
" Summed across documents, this gives LambdaKW.\"\"\"\n",
" docid, N, WordN, CountN, CountW, PhiWK, GammaK, debug = doc_state\n",
" # Summing over words\n",
" return (PhiWK * CountW[:,np.newaxis]).T\n",
"\n",
"rand = np.random.RandomState(seed=90)\n",
"params = (np.ones(K), rand.dirichlet(alpha=np.ones(W) * 0.5, size=K))\n",
"first_doc_one_estep = doc_estep(first_doc_state, params)\n",
"first_doc_one_estep, doc_to_local_lambda(first_doc_one_estep)"
],
"language": "python",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 44,
"text": [
"((u'TRAAAAV128F421A322',\n",
" 30,\n",
" array([ 0, 1, 2, 3, 4, 5, 6, 7, 10, 11, 12, 13, 14, 17, 18, 19, 20,\n",
" 22, 24, 25, 27, 29, 35, 41, 44, 53, 55, 56, 67, 98]),\n",
" array([6, 4, 2, 2, 5, 3, 1, 1, 1, 2, 3, 1, 1, 2, 2, 2, 2, 4, 1, 2, 1, 1, 2,\n",
" 1, 1, 2, 1, 1, 1, 1]),\n",
" array([6, 4, 2, 2, 5, 3, 1, 1, 0, 0, 1, 2, 3, 1, 1, 0, 0, 2, 2, 2, 2, 0, 4,\n",
" 0, 1, 2, 0, 1, 0, 1, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0,\n",
" 0, 0, 0, 0, 0, 0, 0, 2, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,\n",
" 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n",
" 0, 0, 0, 0, 0, 0, 1, 0]),\n",
" array([[ 2.36343759e-01, 3.05675253e-02, 1.60023105e-02, ...,\n",
" 2.68071836e-02, 2.45089676e-01, 7.36762989e-03],\n",
" [ 2.02350676e-04, 7.30153101e-04, 7.74639699e-04, ...,\n",
" 1.97557892e-03, 5.67893527e-04, 1.42668263e-03],\n",
" [ 1.45358981e-03, 6.67218765e-04, 4.85800962e-04, ...,\n",
" 9.59054782e-05, 3.57411274e-04, 8.98134592e-05],\n",
" ..., \n",
" [ 8.32884326e-04, 2.84764845e-03, 3.44887536e-01, ...,\n",
" 1.71378915e-03, 1.61376340e-03, 5.10168896e-02],\n",
" [ 4.42303318e-03, 3.73335766e-02, 5.93107187e-01, ...,\n",
" 6.39902675e-02, 4.35290059e-03, 2.00690915e-02],\n",
" [ 1.80436800e-03, 1.19809278e-02, 6.07049981e-03, ...,\n",
" 1.59406255e-03, 8.30833983e-03, 1.48932462e-03]]),\n",
" array([ 3.38380022, 4.63882065, 6.10179365, 5.44008664, 5.40007418,\n",
" 2.16507836, 5.2910406 , 5.44042001, 2.79243525, 3.95248756,\n",
" 2.59916025, 3.25135179, 4.9055958 , 2.3692461 , 3.86581106,\n",
" 4.03313215, 3.0601288 , 2.30513794, 3.34456571, 4.6598333 ]),\n",
" {'gamma_diffs': array([ 2.95000000e+00, 5.83740977e-04, 3.64905247e-07,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00]),\n",
" 'it': 2}),\n",
" array([[ 1.41806255e+00, 8.09402704e-04, 2.90717962e-03, ...,\n",
" 0.00000000e+00, 4.42303318e-03, 0.00000000e+00],\n",
" [ 1.83405152e-01, 2.92061240e-03, 1.33443753e-03, ...,\n",
" 0.00000000e+00, 3.73335766e-02, 0.00000000e+00],\n",
" [ 9.60138631e-02, 3.09855880e-03, 9.71601924e-04, ...,\n",
" 0.00000000e+00, 5.93107187e-01, 0.00000000e+00],\n",
" ..., \n",
" [ 1.60843102e-01, 7.90231567e-03, 1.91810956e-04, ...,\n",
" 0.00000000e+00, 6.39902675e-02, 0.00000000e+00],\n",
" [ 1.47053806e+00, 2.27157411e-03, 7.14822548e-04, ...,\n",
" 0.00000000e+00, 4.35290059e-03, 0.00000000e+00],\n",
" [ 4.42057793e-02, 5.70673050e-03, 1.79626918e-04, ...,\n",
" 0.00000000e+00, 2.00690915e-02, 0.00000000e+00]]))"
]
}
],
"prompt_number": 44
},
{
"cell_type": "heading",
"level": 3,
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"Log Likelihood Per Document"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In variational inference, the log likelihood of the data is guaranteed to \n",
"increase (up to numerical errors) with each successive iteration.\n",
"The following code may have bugs, but it should give an idea of what is required."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from scipy.special import digamma, gamma, gammaln\n",
"from numpy import exp, sum, log\n",
"\n",
"# Largely adapted from lda-inference.c in Blei's LDA-C code.\n",
"def doc_likelihood(doc_state, params):\n",
" docid, N, WordN, CountN, CountW, PhiWK, GammaK, debug = doc_state\n",
" AlphaK, LambdaKW = params\n",
" \n",
" # First, p(theta_d|alpha)\n",
" digsumGammaK = digamma(GammaK.sum())\n",
" digGammaK = digamma(GammaK)\n",
" \n",
" digsumLambdaK = digamma(sum(LambdaKW, axis=1))\n",
" digLambdaKW = digamma(LambdaKW)\n",
" \n",
" ll = gammaln(sum(AlphaK)) - sum(gammaln(AlphaK))\n",
" ll -= sum((GammaK-1)*(digGammaK-digsumGammaK))\n",
" ll += sum(gammaln(LambdaKW))\n",
" ll /= D\n",
" \n",
" ll -= gammaln(sum(GammaK))\n",
" ll += sum((AlphaK-1)*(digGammaK-digsumGammaK) + gammaln(GammaK))\n",
" \n",
" ll += sum(PhiWK * CountW[:,np.newaxis] *\n",
" ((digGammaK - digsumGammaK)[np.newaxis,:] +\n",
" (digLambdaKW - digsumLambdaK[:,np.newaxis]).T -\n",
" gammaln(PhiWK)))\n",
"\n",
" return float(ll)\n",
"\n",
"rand = np.random.RandomState(seed=90)\n",
"doc_likelihood(first_doc_state, (np.ones(K), \n",
" rand.dirichlet(alpha=np.ones(W) * 0.5, size=K)))"
],
"language": "python",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 45,
"text": [
"-2856195.902133062"
]
}
],
"prompt_number": 45
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"### Testing synchronously on a small dataset to avoid bugs"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Sync to test\n",
"\n",
"rand = np.random.RandomState(seed=99)\n",
"\n",
"AlphaK = np.ones(K) * 0.5 # prefer sparsity?\n",
"EtaW = np.ones(W) * 0.5\n",
"LambdaKW = rand.dirichlet(alpha=EtaW, size=K)\n",
"# print LambdaKW.shape\n",
"\n",
"iters = 20\n",
"lls = np.zeros(iters)\n",
"\n",
"with open('mxm_dataset_train_30.txt', 'r') as f:\n",
" ds_arr = [doc_init(line, (AlphaK, LambdaKW)) for line in f]\n",
"\n",
"for it in range(iters):\n",
" \n",
" lls[it] = sum([doc_likelihood(ds, (AlphaK, LambdaKW)) for ds in ds_arr])\n",
" \n",
" print lls[it]\n",
" \n",
" if it > 0 and lls[it] - lls[it-1] < 0.00001:\n",
" pass # break\n",
" \n",
" ds_arr = [doc_estep(ds, (AlphaK, LambdaKW)) for ds in ds_arr]\n",
" \n",
" LambdaKW[:,:] = EtaW[np.newaxis,:]\n",
" for ds in ds_arr:\n",
" LambdaKW += doc_to_local_lambda(ds)"
],
"language": "python",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"-329544190287.0\n",
"-38631.2086147"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-54752.9484271"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55131.2911625"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55369.6436475"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55520.7381892"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55616.7938097"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55677.9374694"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55716.873878"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55741.6670812"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55757.4497179"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55767.4924818"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55773.8801682"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55777.9414058"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55780.5225621"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55782.1625145"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55783.2041832"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55783.8656808"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55784.285676"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"-55784.5522959"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n"
]
}
],
"prompt_number": 46
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"### Bringing it all together!\n",
"\n",
"Note here that we set `doc_states = doc_states.map(...)`. Spark uses a LRU cache internally for its RDDs so this should work correctly."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Here we set up global and per-document priors using a very simple\n",
"# map step. Note the cache() here: it will keep every few doc_states on\n",
"# a dedicated node in memory, since we'll be iterating on it!\n",
"\n",
"from operator import add\n",
"\n",
"rand = np.random.RandomState(seed=99)\n",
"\n",
"AlphaK = np.ones(K) * 0.5 # prefer sparsity?\n",
"EtaW = np.ones(W) * 0.5\n",
"LambdaKW = rand.dirichlet(alpha=EtaW, size=K)\n",
"\n",
"doc_states_init = text.map(lambda line:\n",
" doc_init(line, (AlphaK, LambdaKW))).cache()\n",
"doc_states = doc_states_init\n",
"\n",
"# doc_states.reduce(lambda s1,s2: 1)"
],
"language": "python",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [],
"prompt_number": 50
},
{
"cell_type": "code",
"collapsed": true,
"input": [
"iters = 50\n",
"lls = np.zeros(iters)\n",
"for it in range(iters):\n",
" \n",
" lls[it] = doc_states.map(lambda doc_state:\n",
" doc_likelihood(doc_state, (AlphaK, LambdaKW))) \\\n",
" .reduce(lambda a,b: a+b)\n",
" \n",
" print lls[it]\n",
" \n",
" if it > 0 and lls[it] - lls[it-1] < 0.00001:\n",
" pass # break\n",
" \n",
" doc_states = doc_states.map(lambda doc_state:\n",
" doc_estep(doc_state, (AlphaK, LambdaKW))) \\\n",
" .cache()\n",
" \n",
" LambdaKW = doc_states.map(doc_to_local_lambda) \\\n",
" .reduce(lambda a,b: a+b)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Unfortunately... this code would not run on our full cluster. $PYTHON_PATH problems may be an issue."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"### ... Or maybe not.\n",
"\n",
"So it's still in development, and it's very prone to hard-to-track bugs, particularly when a Python worker process crashes without emitting output. While there is a web interface to display standard output from workers, it doesn't currently seem to capture failure cases.\n",
"\n",
"Additionally, you can see a live view open bugs below; refer to this if any of these are relevant to your use case.\n",
"\n",
"---\n",
"\n",
"<iframe src=\"https://spark-project.atlassian.net/sr/jira.issueviews:searchrequest-printable/temp/SearchRequest.html?jqlQuery=project+%3D+SPARK+AND+resolution+%3D+Unresolved+AND+priority+%3D+Major+ORDER+BY+key+DESC&tempMax=1000\" height=\"500px\" width=\"100%\"></iframe>"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Alternatives: What about MPI?\n",
"\n",
"MPI is a basic low-level, multiple language communications protocol for synchronization and communication between processes on multiple machines, well-supported on Harvard's clusters (and presumably others).\n",
"\n",
"Master (or parent, or client) side code:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#!/usr/bin/env python\n",
"from mpi4py import MPI\n",
"import numpy\n",
"import sys\n",
"\n",
"comm = MPI.COMM_SELF.Spawn(sys.executable,\n",
" args=['cpi.py'],\n",
" maxprocs=5)\n",
"\n",
"N = numpy.array(100, 'i')\n",
"comm.Bcast([N, MPI.INT], root=MPI.ROOT)\n",
"PI = numpy.array(0.0, 'd')\n",
"comm.Reduce(None, [PI, MPI.DOUBLE],\n",
" op=MPI.SUM, root=MPI.ROOT)\n",
"print(PI)\n",
"\n",
"comm.Disconnect()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Worker (or child, or server) side:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#!/usr/bin/env python\n",
"from mpi4py import MPI\n",
"import numpy\n",
"\n",
"comm = MPI.Comm.Get_parent()\n",
"size = comm.Get_size()\n",
"rank = comm.Get_rank()\n",
"\n",
"N = numpy.array(0, dtype='i')\n",
"comm.Bcast([N, MPI.INT], root=0)\n",
"h = 1.0 / N; s = 0.0\n",
"for i in range(rank, N, size):\n",
" x = h * (i + 0.5)\n",
" s += 4.0 / (1.0 + x**2)\n",
"PI = numpy.array(s * h, dtype='d')\n",
"comm.Reduce([PI, MPI.DOUBLE], None,\n",
" op=MPI.SUM, root=0)\n",
"\n",
"comm.Disconnect()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# What about MPI?\n",
"\n",
"- Need to code your own fault tolerance, including data redistribution and checkpoints.\n",
"- Need to explicitly send every constant and function over the wire.\n",
"- Need different executables for each type of distributed task.\n",
"- But potentially easier to understand what's going on, and to track down bugs.\n",
"- And you can do inter-node data-sharing that's not possible in any MapReduce-specific framework.\n",
"\n",
"# Conclusion\n",
"\n",
"Spark is a promising framework, but it's not the best fit for every problem.\n",
"Its complexity and multiple processes make it hard to track down bugs;\n",
"however, if everything works right, you get multi-node parallelism\n",
"with very little code!"
]
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment