Created
September 14, 2017 14:17
-
-
Save sc6l6d3v/9681225f709f3b6901054f3be59d3831 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package iptv.datascience | |
package dash | |
import java.text.SimpleDateFormat | |
import com.sksamuel.elastic4s.http.HttpClient | |
import com.sksamuel.elastic4s.indexes.IndexDefinition | |
import com.sksamuel.elastic4s.{ElasticsearchClientUri, Indexable} | |
import io.circe._ | |
import io.circe.generic.semiauto._ | |
import io.circe.syntax._ | |
import iptv.datascience.dash.models.RichRec | |
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy | |
import org.joda.time.DateTime | |
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} | |
/** | |
* Created by hkatz on 7/11/17. | |
*/ | |
object ElasticWriter { | |
import com.sksamuel.elastic4s.http.ElasticDsl._ | |
case class wareHouse(val key: String, val maptype: String, val rundate: DateTime, | |
val recs: Seq[RichRec], val recs_suggest: Seq[String]) | |
implicit val dateTimeEncoder: Encoder[DateTime] = Encoder.instance(a => a.getMillis.asJson) | |
implicit val dateTimeDecoder: Decoder[DateTime] = Decoder.instance { a => | |
Decoder.decodeLong.map(s => new DateTime(s)).apply(a) | |
} | |
implicit val encodeRichRec: Encoder[RichRec] = deriveEncoder[RichRec] | |
implicit val decodeRichRec: Decoder[RichRec] = deriveDecoder[RichRec] | |
implicit val encodeWareHouse: Encoder[wareHouse] = /*deriveEncoder[wareHouse]*/ new Encoder[wareHouse] { | |
override def apply(a: wareHouse): Json = Json.obj( | |
("key", Json.fromString(a.key)), | |
("maptype", Json.fromString(a.maptype)), | |
("rundate", Json.fromString(a.rundate.toString)), | |
("recs", Json.fromValues(a.recs.toIterable.map(_.asJson))) | |
) | |
} | |
implicit val decodeWareHouse: Decoder[wareHouse] = deriveDecoder[wareHouse] | |
class richRecIndexable extends Indexable[RichRec] { | |
override def json(t: RichRec): String = | |
s"""{"id":"${t.id}","score":"${t.score}","title":"${t.title}","programType":"${t.programType}","year":"${t.year}","rating":"${t.rating}"}""" | |
def genList(recs: Seq[RichRec]): String = recs.map(r => json(r)).mkString("""[""",""",""","""]""") | |
} | |
class wareHouseIndexable extends Indexable[wareHouse] { | |
def stringifyList(l: Seq[String]) = l.mkString("""["""", """","""", """"]""") | |
val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss") | |
val rr = new richRecIndexable() | |
override def json(t: wareHouse): String = { | |
s"""{"key":"${t.key}","maptype":"${t.maptype}","rundate":"${dateFormatter.print(t.rundate)}","recs":${rr.genList(t.recs)}, "recs_suggest":{"input":[${stringifyList(t.recs_suggest)}]}}""" | |
} | |
} | |
implicit val myIndexable = new wareHouseIndexable() | |
def getClient(host: String, port: Int): HttpClient = HttpClient(ElasticsearchClientUri(host, port)) | |
def getIndex(index: String, indextype: String): IndexDefinition = indexInto(index / indextype) | |
/*def getWareHouseList(indexRef: IndexDefinition, maptype: String, data: Map[String, List[RichRec]]): Seq[wareHouse] = | |
data.map { case (key, reclist) => | |
wareHouse(key, maptype, DateTime.now, reclist) | |
}.toSeq | |
def getIndexList(indexRef: IndexDefinition, maptype: String, data: Map[String, List[RichRec]]): Seq[IndexDefinition] = | |
data.map { case (key, reclist) => | |
wareHouse(key, maptype, DateTime.now, reclist) | |
}.map(indexRef.doc(_)).toSeq*/ | |
//implicit val esDuration = 100.seconds | |
def writeData(client: HttpClient, jsonRecs: Seq[IndexDefinition]) = | |
client.execute { | |
bulk(jsonRecs).refresh(RefreshPolicy.WAIT_UNTIL) | |
}.await | |
def writeData(indexRef: IndexDefinition, client: HttpClient, whRecs: Seq[wareHouse]) = | |
whRecs.map { wh => | |
client.execute { | |
indexRef doc wh refresh(RefreshPolicy.IMMEDIATE) | |
}.await | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment