Skip to content

Instantly share code, notes, and snippets.

class ActivityInsightsJob(activityReader: SparkReader,
analyticsReader: SparkReader,
insightsWriter: SparkWriter
)(implicit val sparkSession: SparkSession) extends SparkTask {
def run(): Unit = {
val metricsDF = analyticsReader.read(Some(AnalyticsSchema))
.transform(Events.isActivityImpression)
.transform(Events.isActivityView)
.transform(Events.isBookmarked)
@afranzi
afranzi / pyspark.sh
Created May 21, 2019 11:42
PySpark with org.apache.hadoop:hadoop-aws:2.8.5
export SPARK_OPTS='--packages org.apache.hadoop:hadoop-aws:2.8.5 --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider'
pyspark
@afranzi
afranzi / SparkJupyter.md
Last active May 16, 2019 09:51
Spark + Toree + Jupyter

Install Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
 python3 /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
@afranzi
afranzi / DownloadSpark.md
Last active May 16, 2019 09:39
Download Spark from archive.

Setup Environmnet variables

export SPARK_VERSION=2.4.0
export SPARK_PACKAGE=spark-${SPARK_VERSION}-bin-hadoop2.7
export SPARK_HOME=$HOME/spark-${SPARK_VERSION}

export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"
@afranzi
afranzi / PyEnv.sh
Created May 16, 2019 09:12
PyEnv.sh
CFLAGS="-I$(xcrun --show-sdk-path)/usr/include" pyenv install 3.6.7
pyenv global 3.6.7
mkvirtualenv mlflow
workon mlflow
@afranzi
afranzi / PySparkJupyter.md
Last active May 16, 2019 08:47
Setup PySpark with Jupyter notebooks

Start by downloading the latest stable Apache Spark (current 2.4.3).

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Install PySpark and Jupyter in our virtualEnv

@afranzi
afranzi / SchemasReferenced.scala
Created March 13, 2019 14:18
Example of schemas references in a Sensor Wifi event
val schemasReferenced: Seq[SchemaReferenced] = Seq(
SchemaReferenced("#", "/schemas/events/base-event/1.json"),
SchemaReferenced("#", "/schemas/events/base-device-event/1.json"),
SchemaReferenced("#", "/schemas/events/device-sensor-event/1.json"),
SchemaReferenced("#/data", "/schemas/objects/sensors/SensorWifi/1.json"),
SchemaReferenced("#/data/scan/[0]", "/schemas/objects/sensors/WifiConnection/1.json"),
SchemaReferenced("#/data/scan/[1]", "/schemas/objects/sensors/WifiConnection/1.json"),
SchemaReferenced("#/device", "/schemas/objects/Device/1.json"),
SchemaReferenced("#/product", "/schemas/objects/Product/3.json"),
SchemaReferenced("#/user", "/schemas/objects/User/2.json")
@afranzi
afranzi / SchemaEvolve.scala
Created March 13, 2019 14:17
Evolve json events using JSLT expressions
def evolve(event: ValidationResult[JsonNode]): EvolutionResult[JsonNode] = {
val schemasReferenced: Seq[SchemaReferenced] = event.schemasReferenced
val json = event.event
val schemasToEvolve = schemasReferenced
.filter { case SchemaReferenced(_, schemaRef) => hasEvolution(schemaRef) }
val eventEvolved = schemasToEvolve
.foldLeft(json) {
case (jsonEvent: JsonNode, SchemaReferenced(location, schemaRef)) =>
@afranzi
afranzi / device-sensor-event.jslt
Last active March 13, 2019 14:16
JSLT evolution for a device-sensor-event
{
"user": {
"id": .userId
},
"device": {
"id": "undefined",
"platform": if(.p == "a") "Android" else "iPhone"
},
"product": {
"id": "remix",
@afranzi
afranzi / SchemaValidationListener.scala
Created March 13, 2019 14:15
Schema Validation Listener for JSON Schemas
import SchemaValidationListener._
import org.everit.json.schema.Schema
import org.everit.json.schema.event.{CombinedSchemaMatchEvent, SchemaReferencedEvent, ValidationListener}
import scala.collection.mutable.ListBuffer
import scala.util.matching.Regex
import scala.collection.JavaConverters._
object SchemaValidationListener {