Skip to content

Instantly share code, notes, and snippets.

@takezoe
Last active March 27, 2018 02:47
Show Gist options
  • Save takezoe/c3b7680ea77ed9601afa7caf7e95ef3c to your computer and use it in GitHub Desktop.
Save takezoe/c3b7680ea77ed9601afa7caf7e95ef3c to your computer and use it in GitHub Desktop.
PredictionIO and Zeppelin
%dep
z.load("org.apache.predictionio:apache-predictionio-core_2.11:0.12.1")
z.load("org.apache.predictionio:apache-predictionio-data_2.11:0.12.1")
z.load("org.apache.predictionio:apache-predictionio-data-jdbc_2.11:0.12.1")
z.load("org.postgresql:postgresql:42.1.4")
z.load("org.clapper:grizzled-slf4j_2.11:1.3.2")
z.load("com.github.nscala-time:nscala-time_2.11:2.18.0")
z.load("org.json4s:json4s-native_2.11:3.2.11")
import org.apache.predictionio.data.storage.jdbc.JDBCPEvents
import org.apache.predictionio.data.storage.StorageClientConfig
val client = "jdbc:postgresql://localhost/pio"
val config = StorageClientConfig(properties = Map("USERNAME" -> "pio", "PASSWORD" -> "pio"))
val events = new JDBCPEvents(client, config, "pio_event")
val eventsRDD = events.find(1)(sc) // 1 is application identifier
case class Rating(
user: String,
item: String,
rating: Double
)
val ratingRDD = eventsRDD.map { event =>
val ratingValue: Double = event.event match {
case "rate" => event.properties.get[Double]("rating")
case "buy" => 4.0 // map buy event to rating value of 4
case _ => throw new Exception(s"Unexpected event ${event} is read.")
}
// entityId and targetEntityId is String
Rating(event.entityId,
event.targetEntityId.get,
ratingValue)
}
val df = spark.createDataFrame(ratingRDD)
df.registerTempTable("rating")
%sql
select * from rating
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment