Skip to content

Instantly share code, notes, and snippets.

@rogerzxu
Created February 16, 2017 21:34
Show Gist options
  • Save rogerzxu/65da183dffc1ec2cf08a9fbbee35f5e6 to your computer and use it in GitHub Desktop.
Save rogerzxu/65da183dffc1ec2cf08a9fbbee35f5e6 to your computer and use it in GitHub Desktop.
package echo360.searchindexer.service
import com.google.inject.Inject
import com.typesafe.scalalogging.LazyLogging
import echo360.search.model._
import echo360.searchindexer.dto.{IndexerAction, ObjectType, SearchIndexEvent}
import echo360.searchindexer.util.SearchDataException
import echo360.simplekafka.consumer.ConsumerRecordProcessor
import echo360.simplekafka.model.IncomingRecord
import play.api.libs.json.{JsError, JsSuccess, Json}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class SearchIndexEventConsumer @Inject()(
indexerService: IndexerService,
searchDataService: SearchDataService
) extends ConsumerRecordProcessor with LazyLogging {
override def process(record: IncomingRecord): Unit = {
try {
val searchIndexEventJson = Json.parse(record.valueString).validate[SearchIndexEvent]
searchIndexEventJson match {
case event: JsSuccess[SearchIndexEvent] => processSyncEvent(event.get)
case ex: JsError =>
logger.debug("Unrecognized SearchIndexEvent, attempting to parse as echo-domain-event")
val data = (Json.parse(record.valueString) \ "data").as[SearchIndexEvent]
processSyncEvent(data)
}
} catch {
case ex: Exception => logger.error(s"Failed to process kafka event ${record.valueString}", ex)
}
}
def processSyncEvent(searchIndexEvent: SearchIndexEvent): Future[Unit] = {
searchIndexEvent.action match {
case IndexerAction.CreateOrUpdate =>
searchIndexEvent.objType match {
case ObjectType.Question =>
handleSearchData[Question](searchIndexEvent, searchDataService.getQuestion(searchIndexEvent.objId))
case ObjectType.User =>
handleSearchData[User](searchIndexEvent, searchDataService.getUser(searchIndexEvent.objId))
case ObjectType.Lesson =>
handleSearchData[Lesson](searchIndexEvent, searchDataService.getLesson(searchIndexEvent.objId))
case ObjectType.Course =>
handleSearchData[Course](searchIndexEvent, searchDataService.getCourse(searchIndexEvent.objId))
case ObjectType.Presentation =>
handleSearchData[Presentation](searchIndexEvent, searchDataService.getPresentation(searchIndexEvent.objId))
case ObjectType.Note =>
handleSearchData[Note](searchIndexEvent, searchDataService.getNote(searchIndexEvent.objId))
case ObjectType.Video =>
handleSearchData[Video](searchIndexEvent, searchDataService.getVideo(searchIndexEvent.objId))
}
case IndexerAction.Delete =>
indexerService.deleteObj(searchIndexEvent)
}
}
private def handleSearchData[T](event: SearchIndexEvent, dataFut: Future[Option[T]]): Future[Unit] = {
(for {
dataOpt <- dataFut
asdf <- dataOpt match {
case Some(data) => indexerService.indexObj[T](event, data)
case None => Future.successful(logger.warn(s"Unable to retrieve ${event.objType} data for id: ${event.objId}"))
}
} yield ()
) recover {
case ex: SearchDataException => logger.error(s"Failed to index $event", ex)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment