Skip to content

Instantly share code, notes, and snippets.

@frne
Last active March 20, 2016 12:31
Show Gist options
  • Save frne/1fe0d44e1cc50c6f6ef4 to your computer and use it in GitHub Desktop.
Save frne/1fe0d44e1cc50c6f6ef4 to your computer and use it in GitHub Desktop.
Code Samples - twitterist.org API
package service
import com.amazonaws.services.machinelearning.model.PredictRequest
import model.FeatureSet
import service.util.{AwsMachineLearningServiceProvider, Loggable}
import scala.collection.JavaConverters._
/** Implementation of the [[service.PredictionEngine]] service */
class AwsPredictionEngine extends PredictionEngine with Loggable with AwsMachineLearningServiceProvider {
/** @inheritdoc*/
override def predict(fs: FeatureSet): Double = {
val result = client.predict(
new PredictRequest()
.withMLModelId(modelId)
.withPredictEndpoint(endpointUrl)
.withRecord(fs.toMap.asJava)
).getPrediction.getPredictedValue
logger.debug(s"""Successfully predicted feature set ${fs.id} and got score $result.""")
result.toDouble
}
}
package actors
import javax.inject.{Named, Inject, Singleton}
import akka.actor.{ActorRef, ActorLogging, Actor}
import model.{FeatureSet, Prediction}
import service.{FeatureExtractor, PredictionQueue}
/** Actor for ftext feature extraction
*
* @param featureExtractor Feature extractor service
* @param predictionQueue Prediction queue service
* @param predictionEngineActor The actor ref to schedule prediction on the ML engine
*/
@Singleton
class FeatureExtractorActor @Inject()(featureExtractor: FeatureExtractor,
predictionQueue: PredictionQueue,
@Named("prediction-engine-actor") val predictionEngineActor: ActorRef)
extends Actor with ActorLogging {
override def receive = {
case p: Prediction =>
featureExtractor.extract(p) match {
case Some(fs: FeatureSet) =>
predictionQueue.updateTextProcFeatureset(fs, p)
predictionEngineActor ! fs
case _ =>
log.error(s"""Unable to extract future set for id ${p.id}!""")
predictionQueue.updatePredictionError(p, Some("Unable to extract enough features from tweet!"))
}
}
}
package service
import model.FeatureSet
import service.util.{ AwsMachineLearningServiceProvider, Loggable }
import scala.collection.JavaConverters._
/** Non operative dummy implementation of the [[service.PredictionEngine]] service */
class NoopPredictionEngine extends PredictionEngine with Loggable {
/** @inheritdoc */
override def predict(fs: FeatureSet): Double = {
val result = scala.util.Random.nextDouble()
logger.debug(s"""Successfully predicted feature set ${fs.id} and got score $result.""")
result
}
}
package actors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import javax.inject.{ Inject, Singleton }
import akka.actor._
import service.DataRetriever
/** Actor to generate training sets
*
* @param dataRetriever Injected data retriever
*/
@Singleton
class TrainingDataUpdateActor @Inject() (system: ActorSystem, dataRetriever: DataRetriever)
(implicit ec: ExecutionContext) extends Actor with ActorLogging with Scheduled {
import DataRetriever._
import TrainingDataUpdateActor._
/** Running state of the scheduler */
var running = false
var sched: Option[Cancellable] = None
/** @inheritdoc */
override def receive = {
case START =>
log.debug("Starting data update for training")
if (!running) {
running = true
sched = Some(system.scheduler.scheduleOnce(0.milliseconds, self, UPDATE))
}
case UPDATE if running =>
dataRetriever.updateTrainingTweets()
if (running) sched = Some(system.scheduler.scheduleOnce(UPDATE_ITERATION_DELAY, self, UPDATE))
case STOP =>
log.debug("Stoping data update for training")
if (running) {
running = false
for (s <- sched) s.cancel()
}
}
}
/** Companion object for scheduler states */
object TrainingDataUpdateActor {
final val START = 0
final val STOP = 1
final val UPDATE = 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment