Skip to content

Instantly share code, notes, and snippets.

@ggsoft
Created February 5, 2016 02:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ggsoft/db518028672c238f0321 to your computer and use it in GitHub Desktop.
Save ggsoft/db518028672c238f0321 to your computer and use it in GitHub Desktop.
package app.model
import app.util.Cfg
import scala.util.Failure
import anorm._
trait Entity[A] {
val parser: RowParser[A]
val sql: String
@annotation.tailrec
final def go(c: Option[Cursor], n: Int, f: A => Unit): Int = c match {
case Some(cursor) => {
val parsed = cursor.row as parser
parsed match {
case scala.util.Success(subj) => {
f(subj)
go(cursor.next,n+1,f)
}
case Failure(e) => {
Cfg.out("Parsing error: " + e)
n
}
}
}
case _ => n
}
def foreach(f: A => Unit): Int = {
DB.withConnection { implicit con =>
SQL(sql).withResult(go(_,0,f)) fold(_ => 0, x => x)
}
}
}
--------------------------------------------
package app.index
import app.elastic._
import app.util.Cfg
import com.sksamuel.elastic4s.ElasticDsl._
import app.model.Entity
trait Action[A] {
val name: String
val entity: Entity[A]
def indexName = name.split("/").head
def shortName = name.split("/").last.split("_").head
def deleteIndex: Boolean =
try {
client.execute {
delete index indexName
}
true
}
catch {
case _: Throwable => false
}
def createIndex: Unit
def insert(a: A): Unit
val process = (a: A) => insert(a)
def processAll = {
if (deleteIndex) {
createIndex
Cfg.out(Cfg.date+" - "+shortName+" reindex process")
val t = System.currentTimeMillis
val count = entity.foreach(process)
Cfg.out("Total time spent: " + ((System.currentTimeMillis - t) / 1000) + " sec")
Cfg.out("Total row inserted: " + count)
} else Cfg.out("Not found running elastic server")
}
}
--------------------------------------------
package app.index
import app.model.City
import app.util.Cfg
import com.sksamuel.elastic4s.ElasticDsl._
import app.elastic._
import com.sksamuel.elastic4s.mappings.FieldType._
object CityAction extends Action[City]{
val name = "hulaa_city_province_ex/city_province"
val entity = City
def createIndex: Unit =
client.execute {
create index indexName mappings (
"city_province" as(
"cityProvinceId" typed StringType index "not_analyzed",
"cityList" typed StringType,
"provinceCode" typed StringType index "not_analyzed",
"province" typed StringType,
"provinceID" typed StringType,
"countryCode" typed StringType,
"countryName" typed StringType,
"countrySlug" typed StringType index "not_analyzed"
)
)
}
def insert(c: City): Unit = {
try {
client.execute {
index into name id c.cityProvinceId fields(
"cityProvinceId" -> c.cityProvinceId,
"cityList" -> c.cityList,
"provinceCode" -> c.provinceCode,
"province" -> c.province,
"provinceID" -> c.provinceID,
"countryCode" -> c.countryCode,
"countryName" -> c.countryName,
"countrySlug" -> c.countrySlug
)
}.await
()
}
catch {
case e: Throwable => Cfg.out("Elastic search "+shortName+" insert problem")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment