Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active May 7, 2023 15:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/25779c454eceed7b7746e33b8039f30a to your computer and use it in GitHub Desktop.
Save dacr/25779c454eceed7b7746e33b8039f30a to your computer and use it in GitHub Desktop.
elasticsearch / opensearch basic read, update, insert, delete operations / published by https://github.com/dacr/code-examples-manager #ae29e73c-709c-4338-bca5-c522b72ed983/966a55d0f7a9c5e6b106ff5558cc0af3f15be58e
// summary : elasticsearch / opensearch basic read, update, insert, delete operations
// keywords : scala, cem, examples, zio, elasticsearch, opensearch, crud
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : ae29e73c-709c-4338-bca5-c522b72ed983
// created-on : 2023-05-07T08:50:35+02:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// ---------------------
//> using scala "3.2.2"
//> using dep "dev.zio::zio:2.0.13"
//> using dep "dev.zio::zio-streams:2.0.13"
//> using dep "dev.zio::zio-test:2.0.13"
//> using dep "dev.zio::zio-json:0.5.0"
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.7.0"
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.7.0"
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.7.0"
//> using dep "org.slf4j:slf4j-api:2.0.7"
//> using dep "org.slf4j:slf4j-simple:2.0.7"
// ---------------------
import zio.*
import zio.json.*
import zio.stream.*
import zio.stream.ZPipeline.{splitLines, utf8Decode}
import zio.test.*
import zio.test.Assertion.*
import zio.test.TestAspect.*
import java.util.UUID
import scala.util.{Try, Success, Failure}
import scala.util.matching.Regex
import scala.util.{Either, Left, Properties, Right}
import java.time.format.DateTimeFormatter.ISO_DATE_TIME
import java.time.temporal.ChronoField
import java.time.{Instant, ZoneId, OffsetDateTime}
import java.util.concurrent.TimeUnit
// =====================================================================================================================
object ElasticOps {
import com.sksamuel.elastic4s.zio.instances.*
import com.sksamuel.elastic4s.ziojson.*
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.Index
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.requests.mappings.*
import com.sksamuel.elastic4s.Response
import com.sksamuel.elastic4s.requests.bulk.BulkResponse
import com.sksamuel.elastic4s.requests.searches.SearchResponse
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback}
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import scala.concurrent.duration.FiniteDuration
import java.time.temporal.ChronoField
import java.util.concurrent.TimeUnit
import scala.util.Properties.{envOrNone, envOrElse}
val elasticUrl = envOrElse("CEM_ELASTIC_URL", "http://127.0.0.1:9200")
val elasticUrlTrust = envOrNone("CEM_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase
val elasticUsername = envOrNone("CEM_ELASTIC_USERNAME")
val elasticPassword = envOrNone("CEM_ELASTIC_PASSWORD")
private val client = { // TODO rewrite to be fully effect based
if (elasticPassword.isEmpty || elasticUsername.isEmpty) ElasticClient(JavaClient(ElasticProperties(elasticUrl)))
else {
lazy val provider = {
val basicProvider = new BasicCredentialsProvider
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get)
basicProvider.setCredentials(AuthScope.ANY, credentials)
basicProvider
}
import org.apache.http.ssl.SSLContexts
import org.apache.http.conn.ssl.TrustSelfSignedStrategy
val sslContext = elasticUrlTrust match {
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build()
case _ => SSLContexts.createDefault()
}
val customElasticClient = ElasticClient(
JavaClient(
ElasticProperties(elasticUrl),
(requestConfigBuilder: RequestConfig.Builder) => requestConfigBuilder,
(httpClientBuilder: HttpAsyncClientBuilder) => httpClientBuilder.setDefaultCredentialsProvider(provider).setSSLContext(sslContext)
)
)
customElasticClient
}
}
private val scrollKeepAlive = FiniteDuration(30, "seconds")
private val timeout = 60.seconds
private val retrySchedule = (Schedule.exponential(500.millis, 2).jittered && Schedule.recurs(5)).onDecision((state, out, decision) =>
decision match {
case Schedule.Decision.Done => ZIO.logInfo("No more retry attempt !")
case Schedule.Decision.Continue(interval) => ZIO.logInfo(s"Will retry at ${interval.start}")
}
)
val upsertGrouping = 50
val searchPageSize = 500
// ------------------------------------------------------
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = {
val year = timestamp.get(ChronoField.YEAR)
val month = timestamp.get(ChronoField.MONTH_OF_YEAR)
val day = timestamp.get(ChronoField.DAY_OF_MONTH)
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR)
s"$indexPrefix-$year-$month"
}
// ------------------------------------------------------
private def streamFromScroll(scrollId: String) = {
ZStream.paginateChunkZIO(scrollId) { currentScrollId =>
for {
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive))
nextScrollId = response.result.scrollId
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) // TODO find a better way than just sourceAsString !
_ <- ZIO.log(s"Got ${results.size} more documents")
} yield results -> (if (results.size > 0) nextScrollId else None)
}
}
def fetchAll[T](indexPattern: String)(using JsonDecoder[T]) = {
val result = for {
response <- client.execute(search(Index(indexPattern)).size(searchPageSize).scroll(scrollKeepAlive))
scrollId <- ZIO.fromOption(response.result.scrollId)
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) // TODO find a better way than just sourceAsString !
_ <- ZIO.log(s"Got ${firstResults.size} first documents")
nextResultsStream = streamFromScroll(scrollId)
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString))
}
// ------------------------------------------------------
def fetch[T](indexPattern: String, id: String)(using JsonDecoder[T]) = {
val response = client.execute {
search(indexPattern).matchQuery("_id", id)
}
response
.map(_.result.hits.hits.headOption.map(_.sourceAsString)) // TODO find a better way than just sourceAsString !
.some
.map(_.fromJson[T])
.absolve
.mapError(err => Exception(err.toString))
}
// ------------------------------------------------------
def upsert[T](
indexPrefix: String,
documents: Chunk[T],
refreshImmediately: Boolean = false
)(
timestampExtractor: T => OffsetDateTime,
idExtractor: T => String
)(using
JsonEncoder[T]
) = {
val responseEffect = client.execute {
bulk {
for { document <- documents } yield {
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document))
val id = idExtractor(document)
def operation = indexInto(indexName).id(id).doc(document)
if (refreshImmediately) operation.refreshImmediately else operation
}
}
}
val upsertEffect = for {
// _ <- ZIO.log(s"Upserting : ${documents.map(_.toJson).mkString("\n-----------------------\n")}")
response <- responseEffect
failures = response.result.failures.flatMap(_.error).map(_.toString)
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch")
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n"))
} yield ()
upsertEffect.timeout(timeout).retry(retrySchedule)
}
// ------------------------------------------------------
def delete(indexPattern: String, id: String) = {
client.execute {
deleteById(indexPattern, id)
}
}
}
// =====================================================================================================================
object SearchOperationsSpec extends ZIOSpecDefault {
case class Something(when: OffsetDateTime, uuid: UUID, name: String, thatBoolean: Boolean, thatNumber: Double, mayBe: Option[String]) derives JsonCodec
override def spec = suite("Basic elasticsearch/opensearch operations specs")(
test("upsert, fetch, fetchAll, delete") {
for {
when <- Clock.currentDateTime
uuid <- Random.nextUUID
indexPrefix = "test-elasticsearch-operations"
indexPattern = s"$indexPrefix-*"
something = Something(when = when, uuid = uuid, name = "Joe", thatBoolean = true, thatNumber = 42.0d, mayBe = Some("nothing"))
_ <- ElasticOps.upsert(indexPrefix, Chunk(something), true)(_.when, _.uuid.toString)
// Because of the way elasticsearch/opensearch works...
// previous upsert may not yet be available to fetch even with refreshImmediately
// Remember that elasticsearch/opensearch is not a database but a search engine, it is also not ACID
_ <- Clock.sleep(1.second)
one <- ElasticOps.fetch[Something](indexPattern, something.uuid.toString)
all <- ElasticOps.fetchAll[Something](indexPattern).runCollect
_ <- ElasticOps.delete(indexPattern, something.uuid.toString)
} yield assertTrue(
all.size == 1,
all.head == something,
one == something
)
}
) @@ withLiveClock
}
SearchOperationsSpec.main(Array.empty)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment