Created
January 17, 2018 08:49
-
-
Save xnull/d6e4ab237a61bc617ac7c0a3d80417aa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.nd.ap.visit_metrics.controller | |
import cats.data.NonEmptyList | |
import cats.free.Free | |
import cats.implicits._ | |
import cats.~> | |
import com.nd.ap.avro.visit_metrics._ | |
import com.nd.ap.visit_metrics.controller.dsl._ | |
import com.nd.ap.visit_metrics.controller.dsl.implementation.Sugar.FlatFrees | |
import com.nd.ap.visit_metrics.controller.dsl.implementation.Database._ | |
import com.nd.ap.visit_metrics.controller.dsl.implementation.FileSystem.MapRFSFileSystemInterpreter | |
import com.nd.ap.visit_metrics.controller.dsl.implementation.Mist.NDMistRequestInterpreter | |
import com.nd.ap.visit_metrics.controller.dsl.implementation.Sidecar.sidecarRequestInterpreter | |
import com.twitter.finagle.http.{Request, Response} | |
import com.twitter.finagle.Service | |
import com.twitter.util._ | |
import doobie.implicits._ | |
import doobie.util.fragment.Fragment | |
import doobie.util.query.Query0 | |
import io.circe.generic.auto._ | |
import io.finch.Error.NotParsed | |
import io.finch._ | |
import io.finch.circe._ | |
import iota.TListK.::: | |
import iota._ | |
import com.github.nscala_time.time.Imports._ | |
import com.github.nscala_time.time.StaticInterval | |
import org.joda.time.Days | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.language.{higherKinds, implicitConversions} | |
import scala.util.control.NonFatal | |
import scala.util.Try | |
case class SidecarResponse(status: String) | |
object HttpServer { | |
private val log = org.slf4j.LoggerFactory.getLogger(getClass) | |
private val visitMetricsPath = "visit_metrics" | |
type VisitMetricsAlgebra[A] = CopK[MistRequest ::: Database ::: FileSystem ::: SidecarRequest ::: TNilK, A] | |
val interpreter: (VisitMetricsAlgebra ~> AsyncErrorOr) = CopK.FunctionK.summon | |
private def visitMetricsProgram(msg: VisitRateMessage)(implicit D: DatabaseActions[VisitMetricsAlgebra], M: MistRequestActions[VisitMetricsAlgebra], F: FileSystemActions[VisitMetricsAlgebra], S: SidecarRequestActions[VisitMetricsAlgebra]): Free[VisitMetricsAlgebra, VisitMetricsResult] = { | |
import D._ | |
import F._ | |
import M._ | |
import S._ | |
val eventsFileLocation = "maprfs://mapr5/sas/opt/etl/prod/pca/v5" | |
def daysRange(startDate: String, endDate: String): List[String] = { | |
val interval = StaticInterval.parse("2017-02-02/2017-03-02") | |
Range(0, interval.toDuration.days.toInt).map((x) => interval.getStart.plusDays(x).toString("YYYY-MM-dd")).toList | |
} | |
def getDirectoriesByDateAndSources(startDate: String, endDate: String, sources: List[String]): List[String] = { | |
val startYear = DateTime.parse(startDate).getYear | |
val endYear = DateTime.parse(endDate).getYear | |
val days = daysRange(startDate, endDate) | |
Range | |
.inclusive(startYear, endYear) | |
.toList | |
.map((m) => s"$eventsFileLocation/year=$m") | |
.flatMap((m) => sources.map((x) => s"$m/src=$x")) | |
.flatMap((m) => days.map((x) => s"$m/dt_log=$x")) | |
} | |
def getStores(locationListId: Int): Query0[StoreItem] = | |
sql""" | |
SELECT DISTINCT id, lat, lon | |
FROM reports_locations | |
WHERE location_list_id=$locationListId | |
""".asInstanceOf[Fragment].query[StoreItem] | |
def getScalingFactors(endDate: String): Query0[ScalingFactor] = | |
sql""" | |
SELECT t1.dma_id AS dmaId, MAX(t1.scaling_factor) AS scalingFactor | |
FROM pca_ref.dma_scaling t1 | |
WHERE t1.dt= ( | |
SELECT t2.dt | |
FROM pca_ref.dma_scaling t2 | |
WHERE t2.dma_id = t1.dma_id AND t2.dt <= $endDate | |
ORDER BY t2.dt DESC | |
LIMIT 1 | |
) | |
GROUP BY t1.dma_id | |
""".asInstanceOf[Fragment].query[ScalingFactor] | |
for { | |
_ <- getDirectoriesByDateAndSources(msg.dateStart, msg.dateEnd, msg.sources).map(exists).flat((m, i) => m && i) | |
storeItems <- selectMany[StoreItem](getStores(msg.locationListId), DatabaseConfig.dbm.connectionString, DatabaseConfig.dbm.properties) | |
scalingFactors <- selectMany[ScalingFactor](getScalingFactors(msg.dateEnd), DatabaseConfig.proddb.connectionString, DatabaseConfig.proddb.properties) | |
requestResult <- runMistFunction[VisitRateArguments, VisitMetricsResult]("kmakarychev_visit_metrics", VisitRateArguments(storeItems, scalingFactors, msg.campaign, msg.dateStart, msg.dateEnd, msg.distToVisit, msg.proximityZone, eventsFileLocation)) | |
_ <- publish[VisitMetricsResult]("visit_metrics_ready", requestResult) | |
} yield requestResult | |
} | |
//noinspection ScalaDeprecation | |
private val visitMetricsEndpoint: Endpoint[SidecarResponse] = post(visitMetricsPath :: jsonBody[VisitRateMessage]) { | |
(msg: VisitRateMessage) => Future { | |
log.info(msg.toString) | |
visitMetricsProgram(msg).foldMap(interpreter).value.onComplete { | |
(result: Try[Either[Throwable, VisitMetricsResult]]) => result.toEither.joinRight match { | |
case Left(e: Throwable) => log.error(s"Something went wrong: $e") | |
case Right(r: VisitMetricsResult) => log.info(s"Success $r") | |
} | |
} | |
SidecarResponse("ok") | |
}.map(Ok) | |
} | |
val api: Service[Request, Response] = visitMetricsEndpoint.handle { | |
case e: NotParsed => | |
log.error("Bad request from client", e) | |
BadRequest(e) | |
case NonFatal(e) => | |
log.error("Unexpected exception", e) | |
InternalServerError(new Exception(e.getCause)) | |
}.toServiceAs[Application.Json] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment