Skip to content

Instantly share code, notes, and snippets.

@sc6l6d3v
Created September 14, 2017 14:17
Show Gist options
  • Save sc6l6d3v/9681225f709f3b6901054f3be59d3831 to your computer and use it in GitHub Desktop.
Save sc6l6d3v/9681225f709f3b6901054f3be59d3831 to your computer and use it in GitHub Desktop.
package iptv.datascience
package dash
import java.text.SimpleDateFormat
import com.sksamuel.elastic4s.http.HttpClient
import com.sksamuel.elastic4s.indexes.IndexDefinition
import com.sksamuel.elastic4s.{ElasticsearchClientUri, Indexable}
import io.circe._
import io.circe.generic.semiauto._
import io.circe.syntax._
import iptv.datascience.dash.models.RichRec
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
/**
* Created by hkatz on 7/11/17.
*/
object ElasticWriter {
import com.sksamuel.elastic4s.http.ElasticDsl._
case class wareHouse(val key: String, val maptype: String, val rundate: DateTime,
val recs: Seq[RichRec], val recs_suggest: Seq[String])
implicit val dateTimeEncoder: Encoder[DateTime] = Encoder.instance(a => a.getMillis.asJson)
implicit val dateTimeDecoder: Decoder[DateTime] = Decoder.instance { a =>
Decoder.decodeLong.map(s => new DateTime(s)).apply(a)
}
implicit val encodeRichRec: Encoder[RichRec] = deriveEncoder[RichRec]
implicit val decodeRichRec: Decoder[RichRec] = deriveDecoder[RichRec]
implicit val encodeWareHouse: Encoder[wareHouse] = /*deriveEncoder[wareHouse]*/ new Encoder[wareHouse] {
override def apply(a: wareHouse): Json = Json.obj(
("key", Json.fromString(a.key)),
("maptype", Json.fromString(a.maptype)),
("rundate", Json.fromString(a.rundate.toString)),
("recs", Json.fromValues(a.recs.toIterable.map(_.asJson)))
)
}
implicit val decodeWareHouse: Decoder[wareHouse] = deriveDecoder[wareHouse]
class richRecIndexable extends Indexable[RichRec] {
override def json(t: RichRec): String =
s"""{"id":"${t.id}","score":"${t.score}","title":"${t.title}","programType":"${t.programType}","year":"${t.year}","rating":"${t.rating}"}"""
def genList(recs: Seq[RichRec]): String = recs.map(r => json(r)).mkString("""[""",""",""","""]""")
}
class wareHouseIndexable extends Indexable[wareHouse] {
def stringifyList(l: Seq[String]) = l.mkString("""["""", """","""", """"]""")
val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss")
val rr = new richRecIndexable()
override def json(t: wareHouse): String = {
s"""{"key":"${t.key}","maptype":"${t.maptype}","rundate":"${dateFormatter.print(t.rundate)}","recs":${rr.genList(t.recs)}, "recs_suggest":{"input":[${stringifyList(t.recs_suggest)}]}}"""
}
}
implicit val myIndexable = new wareHouseIndexable()
def getClient(host: String, port: Int): HttpClient = HttpClient(ElasticsearchClientUri(host, port))
def getIndex(index: String, indextype: String): IndexDefinition = indexInto(index / indextype)
/*def getWareHouseList(indexRef: IndexDefinition, maptype: String, data: Map[String, List[RichRec]]): Seq[wareHouse] =
data.map { case (key, reclist) =>
wareHouse(key, maptype, DateTime.now, reclist)
}.toSeq
def getIndexList(indexRef: IndexDefinition, maptype: String, data: Map[String, List[RichRec]]): Seq[IndexDefinition] =
data.map { case (key, reclist) =>
wareHouse(key, maptype, DateTime.now, reclist)
}.map(indexRef.doc(_)).toSeq*/
//implicit val esDuration = 100.seconds
def writeData(client: HttpClient, jsonRecs: Seq[IndexDefinition]) =
client.execute {
bulk(jsonRecs).refresh(RefreshPolicy.WAIT_UNTIL)
}.await
def writeData(indexRef: IndexDefinition, client: HttpClient, whRecs: Seq[wareHouse]) =
whRecs.map { wh =>
client.execute {
indexRef doc wh refresh(RefreshPolicy.IMMEDIATE)
}.await
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment