Created
February 16, 2017 21:34
-
-
Save rogerzxu/65da183dffc1ec2cf08a9fbbee35f5e6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package echo360.searchindexer.service | |
import com.google.inject.Inject | |
import com.typesafe.scalalogging.LazyLogging | |
import echo360.search.model._ | |
import echo360.searchindexer.dto.{IndexerAction, ObjectType, SearchIndexEvent} | |
import echo360.searchindexer.util.SearchDataException | |
import echo360.simplekafka.consumer.ConsumerRecordProcessor | |
import echo360.simplekafka.model.IncomingRecord | |
import play.api.libs.json.{JsError, JsSuccess, Json} | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.Future | |
class SearchIndexEventConsumer @Inject()( | |
indexerService: IndexerService, | |
searchDataService: SearchDataService | |
) extends ConsumerRecordProcessor with LazyLogging { | |
override def process(record: IncomingRecord): Unit = { | |
try { | |
val searchIndexEventJson = Json.parse(record.valueString).validate[SearchIndexEvent] | |
searchIndexEventJson match { | |
case event: JsSuccess[SearchIndexEvent] => processSyncEvent(event.get) | |
case ex: JsError => | |
logger.debug("Unrecognized SearchIndexEvent, attempting to parse as echo-domain-event") | |
val data = (Json.parse(record.valueString) \ "data").as[SearchIndexEvent] | |
processSyncEvent(data) | |
} | |
} catch { | |
case ex: Exception => logger.error(s"Failed to process kafka event ${record.valueString}", ex) | |
} | |
} | |
def processSyncEvent(searchIndexEvent: SearchIndexEvent): Future[Unit] = { | |
searchIndexEvent.action match { | |
case IndexerAction.CreateOrUpdate => | |
searchIndexEvent.objType match { | |
case ObjectType.Question => | |
handleSearchData[Question](searchIndexEvent, searchDataService.getQuestion(searchIndexEvent.objId)) | |
case ObjectType.User => | |
handleSearchData[User](searchIndexEvent, searchDataService.getUser(searchIndexEvent.objId)) | |
case ObjectType.Lesson => | |
handleSearchData[Lesson](searchIndexEvent, searchDataService.getLesson(searchIndexEvent.objId)) | |
case ObjectType.Course => | |
handleSearchData[Course](searchIndexEvent, searchDataService.getCourse(searchIndexEvent.objId)) | |
case ObjectType.Presentation => | |
handleSearchData[Presentation](searchIndexEvent, searchDataService.getPresentation(searchIndexEvent.objId)) | |
case ObjectType.Note => | |
handleSearchData[Note](searchIndexEvent, searchDataService.getNote(searchIndexEvent.objId)) | |
case ObjectType.Video => | |
handleSearchData[Video](searchIndexEvent, searchDataService.getVideo(searchIndexEvent.objId)) | |
} | |
case IndexerAction.Delete => | |
indexerService.deleteObj(searchIndexEvent) | |
} | |
} | |
private def handleSearchData[T](event: SearchIndexEvent, dataFut: Future[Option[T]]): Future[Unit] = { | |
(for { | |
dataOpt <- dataFut | |
asdf <- dataOpt match { | |
case Some(data) => indexerService.indexObj[T](event, data) | |
case None => Future.successful(logger.warn(s"Unable to retrieve ${event.objType} data for id: ${event.objId}")) | |
} | |
} yield () | |
) recover { | |
case ex: SearchDataException => logger.error(s"Failed to index $event", ex) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment