Last active
September 1, 2024 10:15
-
-
Save dacr/f25da8222b2ac644c3195c5982b7367e to your computer and use it in GitHub Desktop.
CEM tool - code examples content and execution results sent to a search engine, opensearch or elasticsearch / published by https://github.com/dacr/code-examples-manager #63541d6f-32ff-4c6d-be2e-d69a0358ae9d/94659875590027d3ab86592559c0e56dd0a42508
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : CEM tool - code examples content and execution results sent to a search engine, opensearch or elasticsearch | |
// keywords : scala, cem, examples, zio, search, opensearch, elasticsearch, lmdb | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 63541d6f-32ff-4c6d-be2e-d69a0358ae9d | |
// created-on : 2021-11-27T11:32:18+01:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : scala-cli $file | |
// --------------------- | |
//> using scala "3.4.2" | |
//> using dep "dev.zio::zio-streams:2.1.5" | |
//> using dep "fr.janalyse::zio-lmdb:1.8.1" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.11.5" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.11.5" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.11.5" | |
//> using dep "org.slf4j:slf4j-api:2.0.13" | |
//> using dep "org.slf4j:slf4j-simple:2.0.13" | |
//> using javaOpt "--add-opens", "java.base/java.nio=ALL-UNNAMED", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED" | |
// --------------------- | |
import zio.* | |
import zio.json.* | |
import zio.stream.* | |
import zio.lmdb.* | |
import java.util.UUID | |
import java.time.OffsetDateTime | |
// ===================================================================================================================== | |
object ElasticOps { | |
import com.sksamuel.elastic4s.zio.instances.* | |
import com.sksamuel.elastic4s.ziojson.* | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.Index | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.http.JavaClient | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.requests.mappings.* | |
import com.sksamuel.elastic4s.Response | |
import com.sksamuel.elastic4s.requests.bulk.BulkResponse | |
import com.sksamuel.elastic4s.requests.searches.SearchResponse | |
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback} | |
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} | |
import org.apache.http.client.config.RequestConfig | |
import org.apache.http.impl.client.BasicCredentialsProvider | |
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder | |
import scala.concurrent.duration.FiniteDuration | |
import java.time.temporal.ChronoField | |
import java.util.concurrent.TimeUnit | |
import scala.util.Properties.{envOrNone, envOrElse} | |
// WARNING : ELASTIC_URL DEFAULT PORT IS 9200 !! (and not 80 or 443) SO BE EXPLICIT | |
val elasticUrl = envOrElse("CEM_ELASTIC_URL", "http://127.0.0.1:9200") | |
val elasticUrlTrust = envOrNone("CEM_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase | |
val elasticUsername = envOrNone("CEM_ELASTIC_USERNAME") | |
val elasticPassword = envOrNone("CEM_ELASTIC_PASSWORD") | |
private val client = { // TODO rewrite to be fully effect based | |
val elasticProperties = ElasticProperties(elasticUrl) | |
val commonRequestConfigBuilder: RequestConfigCallback = (requestConfigBuilder: RequestConfig.Builder) => | |
requestConfigBuilder | |
.setConnectTimeout(10000) | |
.setRedirectsEnabled(true) | |
.setSocketTimeout(10000) | |
if (elasticPassword.isEmpty || elasticUsername.isEmpty) | |
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder)) | |
else { | |
lazy val provider = { | |
val basicProvider = new BasicCredentialsProvider | |
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get) | |
basicProvider.setCredentials(AuthScope.ANY, credentials) | |
basicProvider | |
} | |
import org.apache.http.ssl.SSLContexts | |
import org.apache.http.conn.ssl.TrustSelfSignedStrategy | |
val sslContext = elasticUrlTrust match { | |
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build() | |
case _ => SSLContexts.createDefault() | |
} | |
val httpClientConfigCallback: HttpClientConfigCallback = | |
(httpClientBuilder: HttpAsyncClientBuilder) => | |
httpClientBuilder | |
.setDefaultCredentialsProvider(provider) | |
.setSSLContext(sslContext) | |
// .setSSLHostnameVerifier(org.apache.http.conn.ssl.NoopHostnameVerifier.INSTANCE) | |
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder, httpClientConfigCallback)) | |
} | |
} | |
private val scrollKeepAlive = FiniteDuration(30, "seconds") | |
private val timeout = 60.seconds | |
private val retrySchedule = (Schedule.exponential(500.millis, 2).jittered && Schedule.recurs(5)).onDecision((state, out, decision) => | |
decision match { | |
case Schedule.Decision.Done => ZIO.logInfo("No more retry attempt !") | |
case Schedule.Decision.Continue(interval) => ZIO.logInfo(s"Will retry at ${interval.start}") | |
} | |
) | |
val upsertGrouping = 50 | |
val searchPageSize = 500 | |
// ------------------------------------------------------ | |
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = { | |
val year = timestamp.get(ChronoField.YEAR) | |
val month = timestamp.get(ChronoField.MONTH_OF_YEAR) | |
val day = timestamp.get(ChronoField.DAY_OF_MONTH) | |
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR) | |
s"$indexPrefix-$year-$month" | |
} | |
// ------------------------------------------------------ | |
private def streamFromScroll(scrollId: String) = { | |
ZStream.paginateChunkZIO(scrollId) { currentScrollId => | |
for { | |
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive)) | |
nextScrollId = response.result.scrollId | |
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${results.size} more documents") | |
} yield results -> (if (results.size > 0) nextScrollId else None) | |
} | |
} | |
def fetchAll[T](indexName: String)(implicit decoder: JsonDecoder[T]) = { | |
// TODO something is going wrong here, sometimes not all results are returned without error being returned | |
// TODO deep pagination issue see https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html | |
val result = for { | |
response <- client.execute(search(Index(indexName)).size(searchPageSize).scroll(scrollKeepAlive)) | |
scrollId <- ZIO.fromOption(response.result.scrollId) | |
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${firstResults.size} first documents") | |
nextResultsStream = streamFromScroll(scrollId) | |
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream | |
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString)) | |
} | |
// ------------------------------------------------------ | |
def upsert[T](indexPrefix: String, documents: Chunk[T])(timestampExtractor: T => OffsetDateTime, idExtractor: T => String)(implicit encoder: JsonEncoder[T]) = { | |
val responseEffect = client.execute { | |
bulk { | |
for { document <- documents } yield { | |
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document)) | |
val id = idExtractor(document) | |
indexInto(indexName).id(id).doc(document) | |
} | |
} | |
} | |
val upsertEffect = for { | |
// _ <- ZIO.log(s"Upserting : ${documents.map(_.toJson).mkString("\n-----------------------\n")}") | |
response <- responseEffect | |
failures = response.result.failures.flatMap(_.error).map(_.toString) | |
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch") | |
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n")) | |
} yield () | |
upsertEffect.timeout(timeout).retry(retrySchedule) | |
} | |
} | |
// ===================================================================================================================== | |
object DTOs { | |
case class CodeExampleDTO( | |
filepath: Option[String], | |
filename: String, | |
content: String, | |
uuid: UUID, | |
category: Option[String], | |
createdOn: Option[OffsetDateTime], | |
lastUpdated: Option[OffsetDateTime], | |
summary: Option[String], | |
keywords: List[String], | |
publish: List[String], | |
authors: List[String], | |
runWith: Option[String], | |
testWith: Option[String], | |
managedBy: Option[String], | |
license: Option[String], | |
updatedCount: Option[Int], | |
attachments: List[String], | |
fileExtension: Option[String], | |
isTestable: Option[Boolean], | |
isExclusive: Option[Boolean], | |
shouldFail: Option[Boolean], | |
isPublishable: Option[Boolean] | |
) derives JsonCodec | |
case class RunStatusDTO( | |
example: CodeExampleDTO, | |
exitCodeOption: Option[Int], | |
stdout: String, | |
startedTimestamp: OffsetDateTime, | |
duration: Long, | |
runSessionDate: OffsetDateTime, | |
runSessionUUID: UUID, | |
success: Boolean, | |
timeout: Boolean, | |
runState: String | |
) derives JsonCodec | |
} | |
object DAOs { | |
case class CodeExampleDAO( | |
filepath: Option[String], | |
filename: String, | |
content: String, | |
hash: String, | |
uuid: UUID, | |
category: Option[String], | |
createdOn: Option[OffsetDateTime], | |
lastUpdated: Option[OffsetDateTime], | |
summary: Option[String], | |
keywords: Set[String], | |
publish: List[String], | |
authors: List[String], | |
runWith: Option[String], | |
testWith: Option[String], | |
managedBy: Option[String], | |
license: Option[String], | |
updatedCount: Option[Int], | |
attachments: Map[String, String], | |
lastSeen: Option[OffsetDateTime] | |
) derives JsonCodec { | |
def fileExtension: String = filename.split("[.]", 2).drop(1).headOption.getOrElse("") | |
def isTestable: Boolean = keywords.contains("@testable") | |
def isExclusive: Boolean = keywords.contains("@exclusive") // exclusive examples are executed sequentially | |
def shouldFail: Boolean = keywords.contains("@fail") | |
def isPublishable: Boolean = !publish.isEmpty | |
override def toString: String = s"$category $filename $uuid $summary" | |
} | |
case class RunStatusDAO( | |
example: CodeExampleDAO, | |
exitCodeOption: Option[Int], | |
stdout: String, | |
startedTimestamp: OffsetDateTime, | |
duration: Long, | |
runSessionDate: OffsetDateTime, | |
runSessionUUID: UUID, | |
success: Boolean, | |
timeout: Boolean, | |
runState: String | |
) derives JsonCodec | |
} | |
// ===================================================================================================================== | |
object CEMTools extends ZIOAppDefault { | |
import DTOs.* | |
import DAOs.* | |
def convertCodeExample(dao: CodeExampleDAO): CodeExampleDTO = { | |
CodeExampleDTO( | |
filepath = dao.filepath, | |
filename = dao.filename, | |
content = dao.content, | |
uuid = dao.uuid, | |
category = dao.category, | |
createdOn = dao.createdOn, | |
lastUpdated = dao.lastUpdated, | |
summary = dao.summary, | |
keywords = dao.keywords.toList, | |
publish = dao.publish, | |
authors = dao.authors, | |
runWith = dao.runWith, | |
testWith = dao.testWith, | |
managedBy = dao.managedBy, | |
license = dao.license, | |
updatedCount = dao.updatedCount, | |
attachments = dao.attachments.keys.toList, | |
fileExtension = Some(dao.fileExtension), | |
isTestable = Some(dao.isTestable), | |
isExclusive = Some(dao.isExclusive), | |
shouldFail = Some(dao.shouldFail), | |
isPublishable = Some(dao.isPublishable) | |
) | |
} | |
def convertRunResults(dao: RunStatusDAO): RunStatusDTO = { | |
RunStatusDTO( | |
example = convertCodeExample(dao.example), | |
exitCodeOption = dao.exitCodeOption, | |
stdout = dao.stdout, | |
startedTimestamp = dao.startedTimestamp, | |
duration = dao.duration, | |
runSessionDate = dao.runSessionDate, | |
runSessionUUID = dao.runSessionUUID, | |
success = dao.success, | |
timeout = dao.timeout, | |
runState = dao.runState | |
) | |
} | |
def fetchExamples() = { | |
val collectionName = "code-examples" | |
for { | |
collection <- LMDB.collectionGet[CodeExampleDAO](collectionName) | |
examplesDAOs <- collection.collect() | |
examples = examplesDAOs.map(convertCodeExample) | |
} yield examples | |
} | |
def fetchRunResults() = { | |
val collectionName = "run-statuses" | |
for { | |
collection <- LMDB.collectionGet[RunStatusDAO](collectionName) | |
runResultsDAO <- collection.collect() | |
runResults = runResultsDAO.map(convertRunResults) | |
} yield runResults | |
} | |
def updateCodeExamplesSearchStore(examples: List[CodeExampleDTO]) = { | |
val examplesIndexPrefix = "cem" | |
val groups = Chunk.fromIterator(examples.grouped(ElasticOps.upsertGrouping)) | |
ZIO.foreach(groups) { documents => | |
ElasticOps.upsert[CodeExampleDTO](examplesIndexPrefix, Chunk.fromIterable(documents))(_.createdOn.get, ex => ex.uuid.toString) | |
} | |
} | |
def updateRunResultsSearchStore(examples: List[RunStatusDTO]) = { | |
val executionResultsIndexPrefix = "exec" | |
val groups = Chunk.fromIterator(examples.grouped(ElasticOps.upsertGrouping)) | |
ZIO.foreach(groups) { documents => | |
ElasticOps.upsert[RunStatusDTO](executionResultsIndexPrefix, Chunk.fromIterable(documents))(_.startedTimestamp, _ => UUID.randomUUID().toString) | |
} | |
} | |
val synchronizeSearchStore = for { | |
examples <- fetchExamples() | |
runResults <- fetchRunResults() | |
_ <- Console.printLine(s"${examples.size} examples & ${runResults.size} RunResults collected") | |
started <- Clock.instant | |
_ <- CEMTools.updateCodeExamplesSearchStore(examples.filter(example => example.isPublishable.contains(true))) | |
_ <- CEMTools.updateRunResultsSearchStore(runResults.filter(result => result.example.isPublishable.contains(true))) | |
finished <- Clock.instant | |
duration = finished.getEpochSecond - started.getEpochSecond | |
_ <- Console.printLine("Search storage updated in %d seconds".formatted(duration)) | |
} yield () | |
def run = synchronizeSearchStore.provide(LMDB.liveWithDatabaseName("code-examples-manager-data"), Scope.default) | |
} | |
CEMTools.main(Array.empty) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment