Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active May 25, 2024 10:19
Show Gist options
  • Save dacr/6ea121f251ad316a64657cbe78085ab7 to your computer and use it in GitHub Desktop.
Save dacr/6ea121f251ad316a64657cbe78085ab7 to your computer and use it in GitHub Desktop.
Photos manager - extract photo records from elasticsearch/opensearch to lmdb / published by https://github.com/dacr/code-examples-manager #6dd8f9f8-9d33-43d5-8416-31f0ce6c71fb/deee4d4e9d8095625e5a668af0a5167f8a3ba5e6
// summary : Photos manager - extract photo records from elasticsearch/opensearch to lmdb
// keywords : scala, photos, memories, zio, stream, update, photo, elasticsearch, lmdb
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 6dd8f9f8-9d33-43d5-8416-31f0ce6c71fb
// created-on : 2023-04-30T21:19:52+02:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// ---------------------
//> using scala "3.4.2"
//> using dep "dev.zio::zio:2.0.15"
//> using dep "dev.zio::zio-streams:2.0.15"
//> using dep "dev.zio::zio-json:0.6.0"
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.9.1"
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.9.1"
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.9.1"
//> using dep "fr.janalyse::zio-lmdb:1.3.0"
//> using dep "dev.zio::zio-config:4.0.0-RC14"
//> using dep "dev.zio::zio-config-typesafe:4.0.0-RC14"
//> using dep "dev.zio::zio:2.0.14"
////> using dep "dev.zio::zio-logging:2.1.14"
////> using dep "dev.zio::zio-logging-slf4j2:2.1.14"
////> using dep "ch.qos.logback:logback-classic:1.4.11"
//> using javaOpt "--add-opens", "java.base/java.nio=ALL-UNNAMED", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED"
// ---------------------
import zio.*
import zio.json.*
import zio.stream.*
import zio.stream.ZPipeline.{splitLines, utf8Decode}
import java.nio.file.Path
import java.time.OffsetDateTime
import java.util.UUID
import scala.util.Try
// =====================================================================================================================
object ElasticOps {
import com.sksamuel.elastic4s.zio.instances.*
import com.sksamuel.elastic4s.ziojson.*
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.Index
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.requests.mappings.*
import com.sksamuel.elastic4s.Response
import com.sksamuel.elastic4s.requests.bulk.BulkResponse
import com.sksamuel.elastic4s.requests.searches.SearchResponse
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback}
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import scala.concurrent.duration.FiniteDuration
import java.time.temporal.ChronoField
import java.util.concurrent.TimeUnit
import scala.util.Properties.{envOrNone, envOrElse}
// WARNING : ELASTIC_URL DEFAULT PORT IS 9200 !! (and not 80 or 443) SO BE EXPLICIT
val elasticUrl = envOrNone("PHOTOS_ELASTIC_URL").orElse(envOrNone("CEM_ELASTIC_URL")).getOrElse("http://127.0.0.1:9200")
val elasticUrlTrust = envOrNone("PHOTOS_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase
val elasticUsername = envOrNone("PHOTOS_ELASTIC_USERNAME").orElse(envOrNone("CEM_ELASTIC_USERNAME"))
val elasticPassword = envOrNone("PHOTOS_ELASTIC_PASSWORD").orElse(envOrNone("CEM_ELASTIC_PASSWORD"))
private val client = { // TODO rewrite to be fully effect based
val elasticProperties = ElasticProperties(elasticUrl)
val commonRequestConfigBuilder: RequestConfigCallback = (requestConfigBuilder: RequestConfig.Builder) =>
requestConfigBuilder
.setConnectTimeout(10000)
.setRedirectsEnabled(true)
.setSocketTimeout(10000)
if (elasticPassword.isEmpty || elasticUsername.isEmpty)
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder))
else {
lazy val provider = {
val basicProvider = new BasicCredentialsProvider
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get)
basicProvider.setCredentials(AuthScope.ANY, credentials)
basicProvider
}
import org.apache.http.ssl.SSLContexts
import org.apache.http.conn.ssl.TrustSelfSignedStrategy
val sslContext = elasticUrlTrust match {
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build()
case _ => SSLContexts.createDefault()
}
val httpClientConfigCallback: HttpClientConfigCallback =
(httpClientBuilder: HttpAsyncClientBuilder) =>
httpClientBuilder
.setDefaultCredentialsProvider(provider)
.setSSLContext(sslContext)
// .setSSLHostnameVerifier(org.apache.http.conn.ssl.NoopHostnameVerifier.INSTANCE)
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder, httpClientConfigCallback))
}
}
private val scrollKeepAlive = FiniteDuration(30, "seconds")
private val timeout = 20.seconds
private val retrySchedule = (Schedule.exponential(500.millis, 2).jittered && Schedule.recurs(5)).onDecision((state, out, decision) =>
decision match {
case Schedule.Decision.Done => ZIO.logInfo("No more retry attempt !")
case Schedule.Decision.Continue(interval) => ZIO.logInfo(s"Will retry at ${interval.start}")
}
)
val upsertGrouping = 50
val searchPageSize = 500
// ------------------------------------------------------
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = {
val year = timestamp.get(ChronoField.YEAR)
val month = timestamp.get(ChronoField.MONTH_OF_YEAR)
val day = timestamp.get(ChronoField.DAY_OF_MONTH)
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR)
s"$indexPrefix-$year-$month"
}
// ------------------------------------------------------
private def streamFromScroll(scrollId: String) = {
ZStream.paginateChunkZIO(scrollId) { currentScrollId =>
for {
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive))
nextScrollId = response.result.scrollId
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString))
_ <- ZIO.log(s"Got ${results.size} more documents")
} yield results -> (if (results.size > 0) nextScrollId else None)
}
}
def fetchAll[T](indexName: String)(implicit decoder: JsonDecoder[T]) = {
// TODO something is going wrong here, sometimes not all results are returned without error being returned
// TODO deep pagination issue see https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html
val result = for {
response <- client.execute(search(Index(indexName)).size(searchPageSize).scroll(scrollKeepAlive))
scrollId <- ZIO.fromOption(response.result.scrollId)
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString))
_ <- ZIO.log(s"Got ${firstResults.size} first documents")
nextResultsStream = streamFromScroll(scrollId)
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString))
}
// ------------------------------------------------------
def upsert[T](indexPrefix: String, documents: Chunk[T])(timestampExtractor: T => OffsetDateTime, idExtractor: T => String)(implicit encoder: JsonEncoder[T]) = {
val responseEffect = client.execute {
bulk {
for { document <- documents } yield {
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document))
val id = idExtractor(document)
indexInto(indexName).id(id).doc(document)
}
}
}
val upsertEffect = for {
response <- responseEffect
failures = response.result.failures.flatMap(_.error).map(_.toString)
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch")
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n"))
} yield ()
upsertEffect.timeout(timeout).retry(retrySchedule)
}
}
// =====================================================================================================================
object Photos extends ZIOAppDefault {
import zio.lmdb.*
case class GeoPoint(lat: Double, lon: Double) derives JsonCodec
case class Photo(
uuid: UUID,
timestamp: OffsetDateTime,
filePath: String,
fileSize: Long,
fileHash: String,
fileLastUpdated: OffsetDateTime,
category: Option[String],
shootDateTime: Option[OffsetDateTime],
camera: Option[String],
tags: Map[String, String],
keywords: List[String], // Extracted from category
classifications: List[String], // Extracted from AI DJL
detectedObjects: List[String], // Extracted from AI DJL
place: Option[GeoPoint]
) derives JsonCodec
val databaseName = "memories"
val collectionName = "photos"
def run = logic.provide(LMDB.liveWithDatabaseName(databaseName), Scope.default)
val logic = for {
started <- Clock.instant
elasticPhotosStream = ElasticOps.fetchAll[Photo]("photos-*")
_ <- LMDB.collectionCreate[Photo](collectionName).ignore
lmdbPhotos <- LMDB.collectionGet[Photo](collectionName)
_ <- elasticPhotosStream
.tap(photo => Console.printLine(s"Updating ${photo.filePath} : ${photo.timestamp}"))
.foreach(photo => lmdbPhotos.upsertOverwrite(photo.uuid.toString, photo))
.tapError(err => Console.printLine(err))
finished <- Clock.instant
duration = finished.getEpochSecond - started.getEpochSecond
_ <- Console.printLine("""LMDB standard tools can be used to manage the database content : sudo apt-get install lmdb-utils""")
lmdb <- ZIO.service[LMDB]
_ <- Console.printLine(s"""To get some statistics : mdb_stat -s $collectionName ${lmdb.databasePath}/""")
_ <- Console.printLine(s"""To dump collection content : mdb_dump -p -s $collectionName ${lmdb.databasePath}/""") // -p to make output printable and easily editable (text)
_ <- Console.printLine(s"Run operations done in $duration seconds")
} yield ()
}
Photos.main(Array.empty)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment