Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active May 7, 2023 15:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/5329f56251cc42536d53c0357d772472 to your computer and use it in GitHub Desktop.
Save dacr/5329f56251cc42536d53c0357d772472 to your computer and use it in GitHub Desktop.
Feed elasticsearch cluster with almost 20 years of chicago crimes. / published by https://github.com/dacr/code-examples-manager #a4fba58a-12b4-4af0-a551-79a50ec66003/31b84545685c0b984002966d3747487e6ad14356
// summary : Feed elasticsearch cluster with almost 20 years of chicago crimes.
// keywords : scala, elasticsearch, feed, chicago, crimes, bigdata
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : a4fba58a-12b4-4af0-a551-79a50ec66003
// created-on : 2018-10-11T15:27:20Z
// managed-by : https://github.com/dacr/code-examples-manager
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
import $ivy.`com.sksamuel.elastic4s::elastic4s-core:7.3.1`
import $ivy.`com.sksamuel.elastic4s::elastic4s-client-esjava:7.3.1`
import $ivy.`com.sksamuel.elastic4s::elastic4s-json-json4s:7.3.1`
import $ivy.`org.json4s::json4s-native:3.6.7`
import $ivy.`org.json4s::json4s-ext:3.6.7`
import $ivy.`ch.qos.logback:logback-classic:1.2.3`
import $ivy.`fr.janalyse::split:0.3.12`
import fr.janalyse.split.CsvSplit.split
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.json4s.ElasticJson4s.Implicits._
import com.sksamuel.elastic4s.requests.mappings._
import com.sksamuel.elastic4s.requests.mappings.FieldType._
import org.json4s.{DefaultFormats, native}
import org.json4s.ext.JavaTimeSerializers
import java.time.{Instant, OffsetDateTime, ZoneId}
import java.time.format.DateTimeFormatter
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.slf4j.{Logger, LoggerFactory}
import scala.concurrent._
import scala.concurrent.duration._
import annotation.tailrec
/*
ammonite good practices
- use class/object namespace when dealing with futures in scripts (in REPL it's OK)
- optimize JVM arguments :
+ export JAVA_OPTS="-Xms2g -Xmx6g"
---
Fill elasticsearch with ~19 years of chicago crimes data :
`curl -L https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD -o crimes.csv`
- ~ 2001-01-01 / 19 years of criminal events / ~ 6 997 433 crimes
- 1.8Gb of data
- 584 seconds requires for the insertion (using await and 50 parallel futures)
+ The main bottleneck is IO - Write operations so it depends of disks performance
- crimes index size = 3.3Gb
export baseURL="http://127.0.0.1:9201"
curl "$baseURL/crimes?pretty"
curl "$baseURL/crimes/_count"
curl "$baseURL/crimes/_search"
curl "$baseURL/crimes/_refresh"
curl "$baseURL/crimes/_flush"
curl -XPOST "$baseURL/crimes/_open"
curl -XPOST "$baseURL/crimes/_close"
curl -XPOST "$baseURL/crimes/_freeze" // X-PACK
curl -XPOST "$baseURL/crimes/_unfreeze" // X-PACK
curl "$baseURL/crimes/_settings?pretty"
curl "$baseURL/_search?q=robbery&pretty&size=5"
curl "$baseURL/_cluster/allocation/explain?pretty" -d '{ "index":"crimes","shard":0,"primary":true}' -H 'Content-Type: application/json'
curl -XPUT "$baseURL/crimes/_settings" -d '{"index.number_of_replicas" : 0 }' -H 'Content-Type: application/json'
curl -XDELETE "$baseURL/crimes"
curl "$baseURL"
curl "$baseURL/_cat/indices"
curl "$baseURL/_all/_settings?pretty"
curl "$baseURL/_cluster/health?pretty"
curl "$baseURL/_cluster/health?pretty&wait_for_status=yellow&timeout=50s"
curl "$baseURL/_cluster/state"
curl "$baseURL/_cluster/state?human&pretty"
curl "$baseURL/_cluster/pending_tasks"
curl "$baseURL/_nodes"
curl "$baseURL/_nodes?pretty"
curl "$baseURL/_nodes/stats"
curl "$baseURL/_nodes/stats?pretty&human" | jq
curl "$baseURL/_search" -d '{"query":{"type":{"value":"_doc"}}}' -H 'Content-Type: application/json'
*/
LoggerFactory
.getLogger(Logger.ROOT_LOGGER_NAME)
.asInstanceOf[ch.qos.logback.classic.Logger]
.setLevel(ch.qos.logback.classic.Level.ERROR)
LoggerFactory
.getLogger("esfill")
.asInstanceOf[ch.qos.logback.classic.Logger]
.setLevel(ch.qos.logback.classic.Level.INFO)
object Feed {
val logger = org.slf4j.LoggerFactory.getLogger("esfill")
import scala.concurrent.ExecutionContext.Implicits.global
implicit val serialization = native.Serialization
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all
val indexName = "crimes"
val mappingName = "testmapping"
// Customize the default configuration, we're going to insert a huge amount of data in a unclean but fast way
val client = ElasticClient( JavaClient(ElasticProperties("http://127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203,127.0.0.1:9204")) )
client.execute {
clusterState()
}.map {
_.result.clusterName
}
def now() = System.currentTimeMillis()
def doCreateTestIndex(name: String) = client.execute {
logger.info(s"doCreateTestIndex($name)")
createIndex(name)
.mappings {
mapping() as Seq(
geopointField("location")
)
}
}
def doUpdateIndexRefreshInterval(name: String, value: String) = {
logger.info(s"doUpdateIndexRefreshInterval($name, $value)")
client.execute {
updateIndexLevelSettings(name)
.refreshInterval(value)
}
}
def doLoadingOptimizationsStart(name: String) = {
logger.info(s"doLoadingOptimizationsStart($name)")
client.execute {
updateIndexLevelSettings(name)
.numberOfReplicas(0)
.refreshInterval("-1")
}
}
def doLoadingOptimizationEnd(name: String) = {
logger.info(s"doLoadingOptimizationsEnd($name)")
client.execute {
updateIndexLevelSettings(name)
.numberOfReplicas(1)
.refreshInterval("10s")
}
}
def doRefreshIndex(name: String) = {
logger.info(s"doRefreshIndex($name)")
client.execute { refreshIndex(name) }
}
def doClean(name: String) = {
logger.info(s"doClean($name)")
client.execute { deleteIndex(name) }
}
def doCount(name: String) = {
logger.info(s"doCount($name)")
client.execute { count(name) }
}
def insertBulk(name: String, entries: Seq[Map[String, String]]) = client.execute {
bulk {
for {entry <- entries} yield {
indexInto(name ).doc(entry)
}
}
}
val dateFormat = DateTimeFormatter.ofPattern("MM/d/yyyy hh:mm:ss a").withZone(ZoneId.of("America/Chicago"))
def normalizeDate(date: String): String = {
Instant.from(dateFormat.parse(date)).toString
}
def normalizeHeaders(headers:List[String]):List[String] = {
headers.map(_.replaceAll("""\s+""", ""))
}
def lineToCell(line:String, limit:Int):Array[String] = {
line.split("""\s*,\s*""", limit)
}
def lineToDocument(headers:List[String])(line: String): Map[String, String] = {
// Join headers and cells into a map
val cells = headers.zip(split(line)).toMap
// Convert date format and normalize Timezone
val foundTimestamp =
cells
.get("Date")
.map(normalizeDate)
// remove parenthesis and space from geopoint and filter out missing locations
val foundLocation =
cells
.get("Location")
.map(_.replaceAll("""[^-,.0-9]""", ""))
.filterNot(_.trim.size==0)
.filter(_.matches("""-?\d+[.]\d+,-?\d+[.]\d+"""))
// Build the final document map
(cells -- Set("Date", "Location")) ++
foundTimestamp.map(timestamp => "timestamp" -> timestamp) ++
foundLocation.map(location => "location" -> location)
}
def fill() {
val linesIterator = scala.io.Source.fromFile("crimes.csv").getLines
val headers = normalizeHeaders(linesIterator.next.split("""\s*,\s*""").toList)
// consume input data
def writeData(indexName: String) = Future {
logger.info(s"writeData($indexName)")
val it = linesIterator.grouped(1000).toIterator // .take(500) // for test purposes, when you want to limit inputs
val parallelismLevel=20
while (it.hasNext) {
print("*" * parallelismLevel)
val groups = it.take(parallelismLevel).map{group =>
insertBulk(indexName, group.map(lineToDocument(headers)))
}
//blocking {
Future.sequence(groups).await // .await slower but won't generate any timeout with default config
//}
}
}
val startedAt = now()
val futureResponse = for {
cleaned <- doClean(indexName) // delete any existing indexName
created <- doCreateTestIndex(indexName) // create the indexName, required for geoloc type mapping
refreshDisabledResponse <- doLoadingOptimizationsStart(indexName) // To accelerate insertion
responses <- writeData(indexName) // bulk operation insert all events
refreshEnabledResponse <- doLoadingOptimizationEnd(indexName) // revert back to a normal behavior
refreshed <- doRefreshIndex(indexName) // to wait for every to be available for search...
count <- doCount(indexName)
} yield {
count
}
Await.result(futureResponse, 30.minutes) // because we don't want to exit the script before the future has completed
futureResponse map { countResponse =>
val duration = (now() - startedAt) / 1000
println(s"$countResponse documents inserted in $duration seconds")
}
}
}
Feed.fill()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment