Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active April 2, 2023 10:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/026c9207894c00871c19c872fd78d821 to your computer and use it in GitHub Desktop.
Save dacr/026c9207894c00871c19c872fd78d821 to your computer and use it in GitHub Desktop.
feed elasticsearch with code examples content, generates ready to insert json files with feeding scripts / published by https://github.com/dacr/code-examples-manager #b82d3ba4-aa7e-436b-a480-c46a4e011a30/6e8dc2c2385fa345f2413e0601cc2fc417d31730
// summary : feed elasticsearch with code examples content, generates ready to insert json files with feeding scripts
// keywords : scala, elasticsearch, cem, code-examples-manager, search, generator, elastic4s, json4s, basic-auth, credential
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : b82d3ba4-aa7e-436b-a480-c46a4e011a30
// created-on : 2021-04-29T16:52:54Z
// managed-by : https://github.com/dacr/code-examples-manager
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
// run-with : cs launch com.lihaoyi:::ammonite:2.3.8 -M ammonite.Main -- $file
import $ivy.`com.github.pathikrit::better-files:3.9.1`
import $ivy.`com.sksamuel.elastic4s::elastic4s-core:7.12.0`
import $ivy.`com.sksamuel.elastic4s::elastic4s-client-esjava:7.12.0`
import $ivy.`com.sksamuel.elastic4s::elastic4s-json-json4s:7.12.0`
import $ivy.`org.json4s::json4s-jackson:3.6.11`
import $ivy.`org.json4s::json4s-ext:3.6.11`
import $ivy.`org.slf4j:slf4j-nop:1.7.30`
import better.files._
import java.nio.file.attribute.PosixFilePermission
import java.time.format.DateTimeFormatter.ISO_DATE_TIME
import java.time.{Instant, ZoneId, ZonedDateTime}
import java.time.temporal.ChronoField
import scala.concurrent.duration.DurationInt
import scala.util.{Properties, Try, Success, Failure}
// =====================================================================================================================
case class Document(
id: String, // important
file: String,
filename: String,
summary: String,
category: String,
content: String,
license: String,
keywords: List[String],
publish: List[String],
execution: Option[String],
run_with: Option[String],
authors: List[String],
created_on: ZonedDateTime,
last_updated: ZonedDateTime,
updated_count: Int,
)
// =====================================================================================================================
val from = Properties.envOrElse("CEM_ELK_FEED_FROM_PATH", ".")
val glob = Properties.envOrElse("CEM_ELK_FEED_GLOB", "**/*.*")
val ignoreRE = Properties.envOrElse("CEM_SEARCH_IGNORE_MASK", "(/[.]bsp)|(/[.]scala)|([.png]$)").r
val gitCreatedByCacheDir = Properties.envOrElse("CEM_ELK_FEED_GIT_CREATED_BY_CACHE_DIR", "/tmp/cem/git-created-by-cache")
val insertedJsonDocumentsCacheDir = Properties.envOrElse("CEM_ELK_FEED_INSERTED_JSON_DOCUMENTS", "/tmp/cem/inserted-documents")
val elasticUrl = Properties.envOrElse("CEM_ELASTIC_URL", "http://127.0.0.1:9200") // CEM_ELASTIC_URL MUST CONTAIN the port explicitly !
val elasticUsername = Properties.envOrNone("CEM_ELASTIC_USERNAME")
val elasticPassword = Properties.envOrNone("CEM_ELASTIC_PASSWORD")
// =====================================================================================================================
def indexNameFromDocument(doc: Document): String = {
val year = doc.created_on.get(ChronoField.YEAR)
val month = doc.created_on.get(ChronoField.MONTH_OF_YEAR)
val day = doc.created_on.get(ChronoField.DAY_OF_MONTH)
val week = doc.created_on.get(ChronoField.ALIGNED_WEEK_OF_YEAR)
//s"cem-$year-$month-$day"
//s"cem-$year-$week"
s"cem-$year-$month"
//s"cem-default"
}
// =====================================================================================================================
def eligible(file: File): Boolean = {
val firstLines = file.lineIterator.take(20).toList
firstLines.exists(_.matches(".* summary : .+")) &&
firstLines.exists(_.matches(".* publish :.*")) &&
firstLines.exists(_.matches(".* id : [-a-f0-9]+$"))
}
val fromFile = File(from)
val files =
fromFile
.glob(glob, includePath = false)
.filterNot(_.isDirectory)
.filterNot(file => ignoreRE.findFirstIn(file.path.toString).isDefined)
.filter(eligible)
.toList
println(s"Found ${files.size} eligible files for indexation")
def extractValue(from: String, key: String): Option[String] = {
val RE = ("""(?m)(?i)^(?:(?:// )|(?:## )|(?:- )|(?:-- )) *""" + key + """ *: *(.*)$""").r
RE.findFirstIn(from).collect { case RE(value) => value.trim }.filter(_.size > 0)
}
def extractValueList(from: String, key: String): List[String] = {
extractValue(from, key).map(_.split("""\s*[,;]\s*""").toList).getOrElse(Nil)
}
def getGitCommandResults(command:String, cachedResultKey:String, checkUpdatedDate:Option[Instant] = None):Try[String] = Try {
val cacheDirFile = gitCreatedByCacheDir.toFile
if (!cacheDirFile.exists()) cacheDirFile.createDirectoryIfNotExists(true)
val cached = file"$cacheDirFile/$cachedResultKey"
val result =
if (
cached.exists && cached.contentAsString.trim.size>0 && (
checkUpdatedDate.isEmpty || checkUpdatedDate.get.toEpochMilli <= cached.lastModifiedTime.toEpochMilli
)
) cached.contentAsString.trim
else {
import scala.sys.process._
val result = command.!!
cached.writeText(result)
result
}
result
}
def getCreatedOnFromGit(file: File, id: String): Try[ZonedDateTime] = {
val filestr = file.pathAsString
val command = s"""git log --diff-filter=A --follow --format=%aI -1 -- $filestr"""
val createdOnStrTried = getGitCommandResults(command, id)
val createdOn = createdOnStrTried.flatMap(createdByStr =>
Try(ZonedDateTime.from(ISO_DATE_TIME.parse(createdByStr.trim)))
)
createdOn
}
def getUpdatedCountFromGit(file: File, id: String): Try[Int] = {
val filestr = file.pathAsString
val command = s"""git log --oneline $filestr"""
val resultTried = getGitCommandResults(command, s"$id-history", Some(file.lastModifiedTime))
resultTried.map(historyContent => historyContent.split("""\R""").size)
}
def getCleanedExampleContentForIndexing(file: File) = {
file
.lineIterator
.dropWhile(line => line.trim.startsWith("<!--"))
.dropWhile(line => line.matches("""(?m)(?i)^(?:(?:// )|(?:## )|(?:- )|(?:-- ))[-a-zA-Z]+ :.*"""))
.dropWhile(line => line.trim.startsWith("-->"))
.dropWhile(line => line.trim.isEmpty)
.mkString("\n")
}
def convertDateTime(that:String):ZonedDateTime = {
ZonedDateTime.from(ISO_DATE_TIME.parse(that))
}
// ----------------------------------------------------------------
// prepare input documents
val documents = for {
file <- files
_ = println(s"reading $file")
contentHeader = file.lineIterator.take(20).mkString("\n")
id <- extractValue(contentHeader, "id")
createdOn <- extractValue(contentHeader, "created-on").map(convertDateTime).orElse(getCreatedOnFromGit(file, id).toOption)
lastUpdated = file.lastModifiedTime
updatedCount <- getUpdatedCountFromGit(file, id).toOption
summary <- extractValue(contentHeader, "summary")
license <- extractValue(contentHeader, "license")
keywords = extractValueList(contentHeader, "keywords")
publish = extractValueList(contentHeader, "publish")
execution = extractValue(contentHeader, "execution")
run_with = extractValue(contentHeader, "run_with")
authors = extractValueList(contentHeader, "authors")
} yield {
Document(
file = file.pathAsString,
id = id,
created_on = createdOn,
last_updated = lastUpdated.atZone(ZoneId.systemDefault()),
filename = file.name,
category = file.parent.pathAsString.replaceAll(fromFile.pathAsString + "/", ""),
content = getCleanedExampleContentForIndexing(file),
summary = summary,
keywords = keywords,
publish = publish,
license = license,
execution = execution,
run_with = run_with,
authors = authors,
updated_count = updatedCount
)
}
// ----------------------------------------------------------------
// id issues - all distincts
val idIssues = documents.groupBy(_.id).filter { case (id, docs) => docs.size > 1 }
assert(idIssues.size == 0,
"Found duplicates in documents identifiers :\n" +
idIssues.map { case (id, docs) => " " + id + " : " + docs.map(_.filename).mkString(" ") }.mkString("\n")
)
// ----------------------------------------------------------------
// summary issues - all distincts
val summaryIssues = documents.groupBy(_.summary.trim.toLowerCase).filter { case (summary, docs) => docs.size > 1 }
assert(summaryIssues.size == 0,
"Found duplicates in documents summaries :\n" +
summaryIssues.map { case (_, docs) => " " + docs.map(_.filename).mkString(" ") }.toList.sorted.mkString("\n")
)
// ----------------------------------------------------------------
// check if some files have been ignored because of missing information
val ignoredFilesIssues = files.filterNot(file => documents.exists(d => d.file == file.pathAsString))
assert(ignoredFilesIssues.size == 0,
"Found ignored files :\n" +
ignoredFilesIssues.map(f => " " + f.path.toString).mkString("\n")
)
// ----------------------------------------------------------------
//documents.sortBy(d => d.category -> d.filename).foreach(doc =>
// println(doc.id, doc.category, doc.filename, doc.summary)
//)
println(s"Found ${documents.size} documents ready for indexation")
// =====================================================================================================================
def now(): Long = System.currentTimeMillis()
// =====================================================================================================================
object ElasticFeed {
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization
import org.json4s.ext.JavaTimeSerializers
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.json4s.ElasticJson4s.Implicits._
import com.sksamuel.elastic4s.requests.mappings._
import com.sksamuel.elastic4s.requests.mappings.FieldType._
import com.sksamuel.elastic4s.Response
import com.sksamuel.elastic4s.requests.bulk.BulkResponse
import java.time.{Instant, OffsetDateTime, ZoneId}
import java.time.format.DateTimeFormatter
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback}
implicit val serialization = Serialization
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all
val client = {
if (elasticPassword.isEmpty || elasticUsername.isEmpty) ElasticClient(JavaClient(ElasticProperties(elasticUrl)))
else {
val username = elasticUsername.get
val password = elasticPassword.get
lazy val provider = {
val provider = new BasicCredentialsProvider
val credentials = new UsernamePasswordCredentials(username, password)
provider.setCredentials(AuthScope.ANY, credentials)
provider
}
val client = ElasticClient(JavaClient(ElasticProperties(elasticUrl), new RequestConfigCallback {
override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder) = {
requestConfigBuilder
}
}, new HttpClientConfigCallback {
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder) = {
httpClientBuilder.setDefaultCredentialsProvider(provider)
}
}))
client
}
}
// IMPORTANT : GROUPED BULK INSERTION FOR BEST PERFORMANCE
def insertBulk(documents: Iterable[Document]): Future[Response[BulkResponse]] = client.execute {
bulk {
for {document <- documents} yield {
val indexName = indexNameFromDocument(document)
indexInto(indexName).id(document.id).doc(document).timeout(2.minutes)
}
}
}
// IMPORTANT : SIZED YOUR GROUPS DEPENDING YOUR DOCUMENT SIZE
def sendToElasticsearch(documents: Iterable[Document]): Future[Response[BulkResponse]] = {
val results: Iterator[Future[Response[BulkResponse]]] = for {
group <- documents.grouped(42).to(Iterator)
} yield {
insertBulk(group).andThen{
case Success(value) => println("processed those documents "+group.map(_.filename).mkString(","))
case Failure(exception: Exception) => println("Couldn't process one of this document "+group.map(_.filename).mkString(",")+" "+exception.getMessage())
}
}
val folded = results.reduce( (fa,fb) => fa.flatMap(_ => fb))
folded
}
def fill(documents: Iterable[Document]): Unit = {
val started = now()
val r = for {
results <- sendToElasticsearch(documents)
} yield results
r.await(300.seconds)
println(s"${documents.size} json documents sent to $elasticUrl in ${now - started}ms")
}
}
// =====================================================================================================================
object FeedUsingCurlGenerator {
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization
import org.json4s.ext.JavaTimeSerializers
import org.json4s.jackson.JsonMethods.{pretty, render}
import org.json4s.Extraction.decompose
implicit val serialization = Serialization
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all
def localGeneratedJsonFilename(document:Document):String = {
val id = document.id
val filename = document.filename
s"$id-$filename.json"
}
def fill(documents: Iterable[Document]): Unit = {
val started = now()
val destDir = insertedJsonDocumentsCacheDir.toFile
if (!destDir.exists) destDir.createDirectoryIfNotExists(true)
destDir.list.foreach(file => file.delete())
documents.foreach { document =>
val generatedFileName = localGeneratedJsonFilename(document)
val dest = file"$destDir/$generatedFileName"
dest.writeText(pretty(decompose(document)))
}
val scriptFile = file"$destDir/feed.sh"
scriptFile.appendLine("""TARGET=${FEED_ELASTIC:-http://127.0.0.1:9200}""")
scriptFile.appendLine("""SRC=$(dirname "$0")""")
scriptFile.appendText(
documents.map { document =>
val index = indexNameFromDocument(document)
val id = document.id
val generatedFileName = localGeneratedJsonFilename(document)
s"""curl -d @$$SRC/$generatedFileName -H "Content-Type: application/json" -XPOST "$$TARGET/$index/_doc/$id?pretty" """
}.mkString("\n")
)
import PosixFilePermission._
scriptFile.setPermissions(Set(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE, GROUP_READ, OTHERS_READ))
println(s"${documents.size} json documents generated to $insertedJsonDocumentsCacheDir ${now - started}ms")
println(s"run $scriptFile to feed again using a shell command (slower)")
}
}
// =====================================================================================================================
val selectedDocuments = documents.filter(_.publish.nonEmpty)
ElasticFeed.fill(selectedDocuments)
//FeedUsingCurlGenerator.fill(selectedDocuments)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment