Skip to content

Instantly share code, notes, and snippets.

@shasts
Last active August 19, 2018 21:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shasts/0a1be749233da1b9e65e995d3b8d5a1d to your computer and use it in GitHub Desktop.
Save shasts/0a1be749233da1b9e65e995d3b8d5a1d to your computer and use it in GitHub Desktop.
package controllers
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import play.api.libs.json.{JsString, Json}
import play.api.libs.ws.{StandaloneWSRequest, WSAuthScheme}
import play.api.libs.ws.ahc.StandaloneAhcWSClient
import scala.concurrent.Await
import scala.io.Source
import scala.concurrent.duration._
object CSVParser {
implicit val system = ActorSystem()
import system.dispatcher
val ws = StandaloneAhcWSClient()(ActorMaterializer.create(system))
val config = ConfigFactory.load()
private val esUrl = config.getString("es.url")
private val esUser = config.getString("es.user")
private val esPassword = config.getString("es.password")
private val esIndex = config.getString("es.index")
private def withWsClient(path: String): StandaloneWSRequest = {
ws.url(s"$esUrl/$path")
.withAuth(esUser, esPassword, WSAuthScheme.BASIC)
.withHttpHeaders("Content-Type" -> "application/json")
}
case class RequesterDetails(
id: Option[String] = None,
requestee: Option[String] = None,
phone: Option[String] = None,
distritc: Option[String] = None,
location: Option[String] = None,
date_added: Option[String] = None,
status: Option[String] = None,
lat: Option[String] = None,
lon: Option[String] = None,
situation: Option[String] = None
)
val requests = scala.collection.mutable.Map[String, RequesterDetails]()
def main(arg: Array[String]): Unit = {
arg.foreach { f =>
val bufferedSource = Source.fromFile(f)
bufferedSource.getLines().foreach { line =>
val tokens = line.split(',').lift
val id = tokens(0).get
val columnId = tokens(1).get
val value = tokens(2).get
val requestOpt = requests.get(id)
requestOpt match {
case None =>
val request = RequesterDetails(id = Some(id))
requests + (id -> enrichWithData(columnId, value)(request))
case Some(request) =>
requests + (id -> enrichWithData(columnId, value)(request))
}
}
}
requests.values.toList.foreach { req =>
val obj = Json.obj(
"id" -> JsString(req.id.getOrElse("")),
"district" -> JsString(req.distritc.getOrElse("")),
"location" -> JsString(req.location.getOrElse("")),
"lat" -> JsString(req.lat.getOrElse("")),
"long" -> JsString(req.lon.getOrElse("")),
"date_added" -> JsString(req.date_added.getOrElse("")),
"status" -> JsString(req.date_added.getOrElse("")),
"requestee_phone" -> JsString(req.phone.getOrElse(""))
)
val fut = withWsClient(s"requests_import/doc/${req.id.get}")
.withHttpHeaders("Content-Type" -> "application/json")
.post(Json.stringify(obj))
val res = Await.result(fut, 10.seconds)
println(res.body)
}
}
def enrichWithData(columnId: String, value: String)(details: RequesterDetails): RequesterDetails = {
val updated = columnId match {
case "98" => details.copy(requestee = Some(value))
case "110" => details.copy(phone = Some(value))
case "125" => details.copy(status = Some(value))
case "76" => details.copy(location = Some(value))
}
updated
}
}
@ajitsen
Copy link

ajitsen commented Aug 19, 2018

I had this in my feeder to add full district

def get_dist_name(code):
code_to_name = {
'tvm': 'Thiruvananthapuram',
'ptm': 'Pathanamthitta',
'alp': 'Alappuzha',
'ktm': 'Kottayam',
'idk': 'Idukki',
'mpm': 'Malappuram',
'koz': 'Kozhikode',
'wnd': 'Wayanad',
'knr': 'Kannur',
'ksr': 'Kasaragod',
'pkd': 'Palakkad',
'tcr': 'Thrissur',
'ekm': 'Ernakulam',
'kol': 'Kollam'
}
return code_to_name[code]

Or we can Synonym feature https://www.elastic.co/guide/en/elasticsearch/guide/current/using-synonyms.html

@shasts
Copy link
Author

shasts commented Aug 19, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment