Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active September 1, 2024 10:15
Show Gist options
  • Save dacr/f25da8222b2ac644c3195c5982b7367e to your computer and use it in GitHub Desktop.
Save dacr/f25da8222b2ac644c3195c5982b7367e to your computer and use it in GitHub Desktop.
CEM tool - code examples content and execution results sent to a search engine, opensearch or elasticsearch / published by https://github.com/dacr/code-examples-manager #63541d6f-32ff-4c6d-be2e-d69a0358ae9d/94659875590027d3ab86592559c0e56dd0a42508
// summary : CEM tool - code examples content and execution results sent to a search engine, opensearch or elasticsearch
// keywords : scala, cem, examples, zio, search, opensearch, elasticsearch, lmdb
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 63541d6f-32ff-4c6d-be2e-d69a0358ae9d
// created-on : 2021-11-27T11:32:18+01:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// ---------------------
//> using scala "3.4.2"
//> using dep "dev.zio::zio-streams:2.1.5"
//> using dep "fr.janalyse::zio-lmdb:1.8.1"
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.11.5"
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.11.5"
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.11.5"
//> using dep "org.slf4j:slf4j-api:2.0.13"
//> using dep "org.slf4j:slf4j-simple:2.0.13"
//> using javaOpt "--add-opens", "java.base/java.nio=ALL-UNNAMED", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED"
// ---------------------
import zio.*
import zio.json.*
import zio.stream.*
import zio.lmdb.*
import java.util.UUID
import java.time.OffsetDateTime
// =====================================================================================================================
object ElasticOps {
import com.sksamuel.elastic4s.zio.instances.*
import com.sksamuel.elastic4s.ziojson.*
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.Index
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.requests.mappings.*
import com.sksamuel.elastic4s.Response
import com.sksamuel.elastic4s.requests.bulk.BulkResponse
import com.sksamuel.elastic4s.requests.searches.SearchResponse
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback}
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import scala.concurrent.duration.FiniteDuration
import java.time.temporal.ChronoField
import java.util.concurrent.TimeUnit
import scala.util.Properties.{envOrNone, envOrElse}
// WARNING : ELASTIC_URL DEFAULT PORT IS 9200 !! (and not 80 or 443) SO BE EXPLICIT
val elasticUrl = envOrElse("CEM_ELASTIC_URL", "http://127.0.0.1:9200")
val elasticUrlTrust = envOrNone("CEM_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase
val elasticUsername = envOrNone("CEM_ELASTIC_USERNAME")
val elasticPassword = envOrNone("CEM_ELASTIC_PASSWORD")
private val client = { // TODO rewrite to be fully effect based
val elasticProperties = ElasticProperties(elasticUrl)
val commonRequestConfigBuilder: RequestConfigCallback = (requestConfigBuilder: RequestConfig.Builder) =>
requestConfigBuilder
.setConnectTimeout(10000)
.setRedirectsEnabled(true)
.setSocketTimeout(10000)
if (elasticPassword.isEmpty || elasticUsername.isEmpty)
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder))
else {
lazy val provider = {
val basicProvider = new BasicCredentialsProvider
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get)
basicProvider.setCredentials(AuthScope.ANY, credentials)
basicProvider
}
import org.apache.http.ssl.SSLContexts
import org.apache.http.conn.ssl.TrustSelfSignedStrategy
val sslContext = elasticUrlTrust match {
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build()
case _ => SSLContexts.createDefault()
}
val httpClientConfigCallback: HttpClientConfigCallback =
(httpClientBuilder: HttpAsyncClientBuilder) =>
httpClientBuilder
.setDefaultCredentialsProvider(provider)
.setSSLContext(sslContext)
// .setSSLHostnameVerifier(org.apache.http.conn.ssl.NoopHostnameVerifier.INSTANCE)
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder, httpClientConfigCallback))
}
}
private val scrollKeepAlive = FiniteDuration(30, "seconds")
private val timeout = 60.seconds
private val retrySchedule = (Schedule.exponential(500.millis, 2).jittered && Schedule.recurs(5)).onDecision((state, out, decision) =>
decision match {
case Schedule.Decision.Done => ZIO.logInfo("No more retry attempt !")
case Schedule.Decision.Continue(interval) => ZIO.logInfo(s"Will retry at ${interval.start}")
}
)
val upsertGrouping = 50
val searchPageSize = 500
// ------------------------------------------------------
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = {
val year = timestamp.get(ChronoField.YEAR)
val month = timestamp.get(ChronoField.MONTH_OF_YEAR)
val day = timestamp.get(ChronoField.DAY_OF_MONTH)
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR)
s"$indexPrefix-$year-$month"
}
// ------------------------------------------------------
private def streamFromScroll(scrollId: String) = {
ZStream.paginateChunkZIO(scrollId) { currentScrollId =>
for {
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive))
nextScrollId = response.result.scrollId
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString))
_ <- ZIO.log(s"Got ${results.size} more documents")
} yield results -> (if (results.size > 0) nextScrollId else None)
}
}
def fetchAll[T](indexName: String)(implicit decoder: JsonDecoder[T]) = {
// TODO something is going wrong here, sometimes not all results are returned without error being returned
// TODO deep pagination issue see https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html
val result = for {
response <- client.execute(search(Index(indexName)).size(searchPageSize).scroll(scrollKeepAlive))
scrollId <- ZIO.fromOption(response.result.scrollId)
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString))
_ <- ZIO.log(s"Got ${firstResults.size} first documents")
nextResultsStream = streamFromScroll(scrollId)
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString))
}
// ------------------------------------------------------
def upsert[T](indexPrefix: String, documents: Chunk[T])(timestampExtractor: T => OffsetDateTime, idExtractor: T => String)(implicit encoder: JsonEncoder[T]) = {
val responseEffect = client.execute {
bulk {
for { document <- documents } yield {
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document))
val id = idExtractor(document)
indexInto(indexName).id(id).doc(document)
}
}
}
val upsertEffect = for {
// _ <- ZIO.log(s"Upserting : ${documents.map(_.toJson).mkString("\n-----------------------\n")}")
response <- responseEffect
failures = response.result.failures.flatMap(_.error).map(_.toString)
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch")
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n"))
} yield ()
upsertEffect.timeout(timeout).retry(retrySchedule)
}
}
// =====================================================================================================================
object DTOs {
case class CodeExampleDTO(
filepath: Option[String],
filename: String,
content: String,
uuid: UUID,
category: Option[String],
createdOn: Option[OffsetDateTime],
lastUpdated: Option[OffsetDateTime],
summary: Option[String],
keywords: List[String],
publish: List[String],
authors: List[String],
runWith: Option[String],
testWith: Option[String],
managedBy: Option[String],
license: Option[String],
updatedCount: Option[Int],
attachments: List[String],
fileExtension: Option[String],
isTestable: Option[Boolean],
isExclusive: Option[Boolean],
shouldFail: Option[Boolean],
isPublishable: Option[Boolean]
) derives JsonCodec
case class RunStatusDTO(
example: CodeExampleDTO,
exitCodeOption: Option[Int],
stdout: String,
startedTimestamp: OffsetDateTime,
duration: Long,
runSessionDate: OffsetDateTime,
runSessionUUID: UUID,
success: Boolean,
timeout: Boolean,
runState: String
) derives JsonCodec
}
object DAOs {
case class CodeExampleDAO(
filepath: Option[String],
filename: String,
content: String,
hash: String,
uuid: UUID,
category: Option[String],
createdOn: Option[OffsetDateTime],
lastUpdated: Option[OffsetDateTime],
summary: Option[String],
keywords: Set[String],
publish: List[String],
authors: List[String],
runWith: Option[String],
testWith: Option[String],
managedBy: Option[String],
license: Option[String],
updatedCount: Option[Int],
attachments: Map[String, String],
lastSeen: Option[OffsetDateTime]
) derives JsonCodec {
def fileExtension: String = filename.split("[.]", 2).drop(1).headOption.getOrElse("")
def isTestable: Boolean = keywords.contains("@testable")
def isExclusive: Boolean = keywords.contains("@exclusive") // exclusive examples are executed sequentially
def shouldFail: Boolean = keywords.contains("@fail")
def isPublishable: Boolean = !publish.isEmpty
override def toString: String = s"$category $filename $uuid $summary"
}
case class RunStatusDAO(
example: CodeExampleDAO,
exitCodeOption: Option[Int],
stdout: String,
startedTimestamp: OffsetDateTime,
duration: Long,
runSessionDate: OffsetDateTime,
runSessionUUID: UUID,
success: Boolean,
timeout: Boolean,
runState: String
) derives JsonCodec
}
// =====================================================================================================================
object CEMTools extends ZIOAppDefault {
import DTOs.*
import DAOs.*
def convertCodeExample(dao: CodeExampleDAO): CodeExampleDTO = {
CodeExampleDTO(
filepath = dao.filepath,
filename = dao.filename,
content = dao.content,
uuid = dao.uuid,
category = dao.category,
createdOn = dao.createdOn,
lastUpdated = dao.lastUpdated,
summary = dao.summary,
keywords = dao.keywords.toList,
publish = dao.publish,
authors = dao.authors,
runWith = dao.runWith,
testWith = dao.testWith,
managedBy = dao.managedBy,
license = dao.license,
updatedCount = dao.updatedCount,
attachments = dao.attachments.keys.toList,
fileExtension = Some(dao.fileExtension),
isTestable = Some(dao.isTestable),
isExclusive = Some(dao.isExclusive),
shouldFail = Some(dao.shouldFail),
isPublishable = Some(dao.isPublishable)
)
}
def convertRunResults(dao: RunStatusDAO): RunStatusDTO = {
RunStatusDTO(
example = convertCodeExample(dao.example),
exitCodeOption = dao.exitCodeOption,
stdout = dao.stdout,
startedTimestamp = dao.startedTimestamp,
duration = dao.duration,
runSessionDate = dao.runSessionDate,
runSessionUUID = dao.runSessionUUID,
success = dao.success,
timeout = dao.timeout,
runState = dao.runState
)
}
def fetchExamples() = {
val collectionName = "code-examples"
for {
collection <- LMDB.collectionGet[CodeExampleDAO](collectionName)
examplesDAOs <- collection.collect()
examples = examplesDAOs.map(convertCodeExample)
} yield examples
}
def fetchRunResults() = {
val collectionName = "run-statuses"
for {
collection <- LMDB.collectionGet[RunStatusDAO](collectionName)
runResultsDAO <- collection.collect()
runResults = runResultsDAO.map(convertRunResults)
} yield runResults
}
def updateCodeExamplesSearchStore(examples: List[CodeExampleDTO]) = {
val examplesIndexPrefix = "cem"
val groups = Chunk.fromIterator(examples.grouped(ElasticOps.upsertGrouping))
ZIO.foreach(groups) { documents =>
ElasticOps.upsert[CodeExampleDTO](examplesIndexPrefix, Chunk.fromIterable(documents))(_.createdOn.get, ex => ex.uuid.toString)
}
}
def updateRunResultsSearchStore(examples: List[RunStatusDTO]) = {
val executionResultsIndexPrefix = "exec"
val groups = Chunk.fromIterator(examples.grouped(ElasticOps.upsertGrouping))
ZIO.foreach(groups) { documents =>
ElasticOps.upsert[RunStatusDTO](executionResultsIndexPrefix, Chunk.fromIterable(documents))(_.startedTimestamp, _ => UUID.randomUUID().toString)
}
}
val synchronizeSearchStore = for {
examples <- fetchExamples()
runResults <- fetchRunResults()
_ <- Console.printLine(s"${examples.size} examples & ${runResults.size} RunResults collected")
started <- Clock.instant
_ <- CEMTools.updateCodeExamplesSearchStore(examples.filter(example => example.isPublishable.contains(true)))
_ <- CEMTools.updateRunResultsSearchStore(runResults.filter(result => result.example.isPublishable.contains(true)))
finished <- Clock.instant
duration = finished.getEpochSecond - started.getEpochSecond
_ <- Console.printLine("Search storage updated in %d seconds".formatted(duration))
} yield ()
def run = synchronizeSearchStore.provide(LMDB.liveWithDatabaseName("code-examples-manager-data"), Scope.default)
}
CEMTools.main(Array.empty)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment