Skip to content

Instantly share code, notes, and snippets.

@xnull
Created January 17, 2018 08:49
Show Gist options
  • Save xnull/d6e4ab237a61bc617ac7c0a3d80417aa to your computer and use it in GitHub Desktop.
Save xnull/d6e4ab237a61bc617ac7c0a3d80417aa to your computer and use it in GitHub Desktop.
package com.nd.ap.visit_metrics.controller
import cats.data.NonEmptyList
import cats.free.Free
import cats.implicits._
import cats.~>
import com.nd.ap.avro.visit_metrics._
import com.nd.ap.visit_metrics.controller.dsl._
import com.nd.ap.visit_metrics.controller.dsl.implementation.Sugar.FlatFrees
import com.nd.ap.visit_metrics.controller.dsl.implementation.Database._
import com.nd.ap.visit_metrics.controller.dsl.implementation.FileSystem.MapRFSFileSystemInterpreter
import com.nd.ap.visit_metrics.controller.dsl.implementation.Mist.NDMistRequestInterpreter
import com.nd.ap.visit_metrics.controller.dsl.implementation.Sidecar.sidecarRequestInterpreter
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.Service
import com.twitter.util._
import doobie.implicits._
import doobie.util.fragment.Fragment
import doobie.util.query.Query0
import io.circe.generic.auto._
import io.finch.Error.NotParsed
import io.finch._
import io.finch.circe._
import iota.TListK.:::
import iota._
import com.github.nscala_time.time.Imports._
import com.github.nscala_time.time.StaticInterval
import org.joda.time.Days
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.{higherKinds, implicitConversions}
import scala.util.control.NonFatal
import scala.util.Try
case class SidecarResponse(status: String)
object HttpServer {
private val log = org.slf4j.LoggerFactory.getLogger(getClass)
private val visitMetricsPath = "visit_metrics"
type VisitMetricsAlgebra[A] = CopK[MistRequest ::: Database ::: FileSystem ::: SidecarRequest ::: TNilK, A]
val interpreter: (VisitMetricsAlgebra ~> AsyncErrorOr) = CopK.FunctionK.summon
private def visitMetricsProgram(msg: VisitRateMessage)(implicit D: DatabaseActions[VisitMetricsAlgebra], M: MistRequestActions[VisitMetricsAlgebra], F: FileSystemActions[VisitMetricsAlgebra], S: SidecarRequestActions[VisitMetricsAlgebra]): Free[VisitMetricsAlgebra, VisitMetricsResult] = {
import D._
import F._
import M._
import S._
val eventsFileLocation = "maprfs://mapr5/sas/opt/etl/prod/pca/v5"
def daysRange(startDate: String, endDate: String): List[String] = {
val interval = StaticInterval.parse("2017-02-02/2017-03-02")
Range(0, interval.toDuration.days.toInt).map((x) => interval.getStart.plusDays(x).toString("YYYY-MM-dd")).toList
}
def getDirectoriesByDateAndSources(startDate: String, endDate: String, sources: List[String]): List[String] = {
val startYear = DateTime.parse(startDate).getYear
val endYear = DateTime.parse(endDate).getYear
val days = daysRange(startDate, endDate)
Range
.inclusive(startYear, endYear)
.toList
.map((m) => s"$eventsFileLocation/year=$m")
.flatMap((m) => sources.map((x) => s"$m/src=$x"))
.flatMap((m) => days.map((x) => s"$m/dt_log=$x"))
}
def getStores(locationListId: Int): Query0[StoreItem] =
sql"""
SELECT DISTINCT id, lat, lon
FROM reports_locations
WHERE location_list_id=$locationListId
""".asInstanceOf[Fragment].query[StoreItem]
def getScalingFactors(endDate: String): Query0[ScalingFactor] =
sql"""
SELECT t1.dma_id AS dmaId, MAX(t1.scaling_factor) AS scalingFactor
FROM pca_ref.dma_scaling t1
WHERE t1.dt= (
SELECT t2.dt
FROM pca_ref.dma_scaling t2
WHERE t2.dma_id = t1.dma_id AND t2.dt <= $endDate
ORDER BY t2.dt DESC
LIMIT 1
)
GROUP BY t1.dma_id
""".asInstanceOf[Fragment].query[ScalingFactor]
for {
_ <- getDirectoriesByDateAndSources(msg.dateStart, msg.dateEnd, msg.sources).map(exists).flat((m, i) => m && i)
storeItems <- selectMany[StoreItem](getStores(msg.locationListId), DatabaseConfig.dbm.connectionString, DatabaseConfig.dbm.properties)
scalingFactors <- selectMany[ScalingFactor](getScalingFactors(msg.dateEnd), DatabaseConfig.proddb.connectionString, DatabaseConfig.proddb.properties)
requestResult <- runMistFunction[VisitRateArguments, VisitMetricsResult]("kmakarychev_visit_metrics", VisitRateArguments(storeItems, scalingFactors, msg.campaign, msg.dateStart, msg.dateEnd, msg.distToVisit, msg.proximityZone, eventsFileLocation))
_ <- publish[VisitMetricsResult]("visit_metrics_ready", requestResult)
} yield requestResult
}
//noinspection ScalaDeprecation
private val visitMetricsEndpoint: Endpoint[SidecarResponse] = post(visitMetricsPath :: jsonBody[VisitRateMessage]) {
(msg: VisitRateMessage) => Future {
log.info(msg.toString)
visitMetricsProgram(msg).foldMap(interpreter).value.onComplete {
(result: Try[Either[Throwable, VisitMetricsResult]]) => result.toEither.joinRight match {
case Left(e: Throwable) => log.error(s"Something went wrong: $e")
case Right(r: VisitMetricsResult) => log.info(s"Success $r")
}
}
SidecarResponse("ok")
}.map(Ok)
}
val api: Service[Request, Response] = visitMetricsEndpoint.handle {
case e: NotParsed =>
log.error("Bad request from client", e)
BadRequest(e)
case NonFatal(e) =>
log.error("Unexpected exception", e)
InternalServerError(new Exception(e.getCause))
}.toServiceAs[Application.Json]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment