Skip to content

Instantly share code, notes, and snippets.

@andypetrella
Created March 14, 2016 19:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andypetrella/a95cbae48686c2fda4af to your computer and use it in GitHub Desktop.
Save andypetrella/a95cbae48686c2fda4af to your computer and use it in GitHub Desktop.
{
"metadata" : {
"name" : "Serializable StateStream",
"user_save_timestamp" : "1970-01-01T01:00:00.000Z",
"auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
"language_info" : {
"name" : "scala",
"file_extension" : "scala",
"codemirror_mode" : "text/x-scala"
},
"trusted" : true,
"customLocalRepo" : null,
"customRepos" : null,
"customDeps" : null,
"customImports" : null,
"customArgs" : null,
"customSparkConf" : {
"spark.app.name" : "Notebook",
"spark.master" : "local[8]",
"spark.executor.memory" : "1G"
}
},
"cells" : [ {
"metadata" : {
"id" : "79F80E392C804259A242E5F8D1EA3C42"
},
"cell_type" : "markdown",
"source" : "```\nwhile :; do head -n 10 <A_FILE>; done | netcat -l -p 8088\n```"
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "622702ED1D5D451280F515203008D8E3"
},
"cell_type" : "code",
"source" : "import org.apache.spark.streaming.{StreamingContext, Seconds}\nStreamingContext.getActive.foreach(_.stop(false))\n\n@transient val ssc = new StreamingContext(sparkContext, Seconds(5))\nssc.checkpoint(\"/tmp/myTestNotebook\")\n\n@transient val d = ssc.socketTextStream(\"localhost\", 8088)\n\n",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "F41E55B93CB1448B93E1C45468316CA4"
},
"cell_type" : "code",
"source" : "import notebook.front._\nimport notebook.{JsonCodec, Codec}\nimport play.api.libs.json._\n\n\nclass A extends DataConnectedWidget[String] with Serializable {\n @transient implicit val singleCodec:Codec[JsValue, String] = JsonCodec.strings\n\n @transient val capacity = 10\n \n @transient val initData = Seq.empty[String]\n @transient val prefill:Option[String]=None\n \n @transient var data = (initData.size, prefill) match {\n case (0, None) => Seq.empty[String]\n case (x, None) => initData\n case (0, Some(i)) => Seq.fill(capacity)(i)\n case (x, Some(i)) => initData.padTo(capacity, i)\n }\n\n apply(data)\n\n @transient lazy val toHtml = <ul data-bind=\"foreach: value\">\n <li data-bind=\"html: $data\"></li>{\n widgets.scopedScript(\n \"\"\"\n |req(\n |['observable', 'knockout'],\n |function (O, ko) {\n | ko.applyBindings({\n | value: O.makeObservable(valueId)\n | },\n | this\n | );\n |});\n \"\"\"stripMargin,\n Json.obj(\"valueId\" -> dataConnection.id)\n )\n }</ul>\n\n override def apply(d:Seq[String]) {\n data = if (d.size > capacity) {\n d.drop(d.size - capacity)\n } else {\n d\n }\n super.apply(data)\n }\n\n def append(s:String) {\n apply(data :+ s)\n }\n\n def appendAll(s:Seq[String]) {\n apply(data ++ s)\n }\n }\n@transient val myUL = new A()",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "3993FF818B7147F689C0C548148C0FB6"
},
"cell_type" : "code",
"source" : "myUL",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "151D01224CE94DE5B0CD819B5CBA298F"
},
"cell_type" : "code",
"source" : "@transient val runningSums = d.map((str) => (str, 1)).updateStateByKey((newValues: Seq[Int], partialSum: Option[Int]) => {\n val currentSum = partialSum.getOrElse(0)\n Some(currentSum + newValues.sum)\n})\n",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "DA774D74BE9449329B7C162C4A73A794"
},
"cell_type" : "code",
"source" : "runningSums.foreachRDD(rdd => {\n val elems = rdd.take(10).toList\n val r = elems.map{case (word, count) => s\"$word: $count\"}\n myUL(r)\n})\n",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "FA2D60D619E5404985C3E391DEE42A9D"
},
"cell_type" : "code",
"source" : "ssc.start()",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "F260D0AC81E6425C8C6B8F8AE7A5978E"
},
"cell_type" : "code",
"source" : "",
"outputs" : [ ]
} ],
"nbformat" : 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment