Last active
May 25, 2024 10:19
-
-
Save dacr/6ea121f251ad316a64657cbe78085ab7 to your computer and use it in GitHub Desktop.
Photos manager - extract photo records from elasticsearch/opensearch to lmdb / published by https://github.com/dacr/code-examples-manager #6dd8f9f8-9d33-43d5-8416-31f0ce6c71fb/deee4d4e9d8095625e5a668af0a5167f8a3ba5e6
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Photos manager - extract photo records from elasticsearch/opensearch to lmdb | |
// keywords : scala, photos, memories, zio, stream, update, photo, elasticsearch, lmdb | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 6dd8f9f8-9d33-43d5-8416-31f0ce6c71fb | |
// created-on : 2023-04-30T21:19:52+02:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : scala-cli $file | |
// --------------------- | |
//> using scala "3.4.2" | |
//> using dep "dev.zio::zio:2.0.15" | |
//> using dep "dev.zio::zio-streams:2.0.15" | |
//> using dep "dev.zio::zio-json:0.6.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.9.1" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.9.1" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.9.1" | |
//> using dep "fr.janalyse::zio-lmdb:1.3.0" | |
//> using dep "dev.zio::zio-config:4.0.0-RC14" | |
//> using dep "dev.zio::zio-config-typesafe:4.0.0-RC14" | |
//> using dep "dev.zio::zio:2.0.14" | |
////> using dep "dev.zio::zio-logging:2.1.14" | |
////> using dep "dev.zio::zio-logging-slf4j2:2.1.14" | |
////> using dep "ch.qos.logback:logback-classic:1.4.11" | |
//> using javaOpt "--add-opens", "java.base/java.nio=ALL-UNNAMED", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED" | |
// --------------------- | |
import zio.* | |
import zio.json.* | |
import zio.stream.* | |
import zio.stream.ZPipeline.{splitLines, utf8Decode} | |
import java.nio.file.Path | |
import java.time.OffsetDateTime | |
import java.util.UUID | |
import scala.util.Try | |
// ===================================================================================================================== | |
object ElasticOps { | |
import com.sksamuel.elastic4s.zio.instances.* | |
import com.sksamuel.elastic4s.ziojson.* | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.Index | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.http.JavaClient | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.requests.mappings.* | |
import com.sksamuel.elastic4s.Response | |
import com.sksamuel.elastic4s.requests.bulk.BulkResponse | |
import com.sksamuel.elastic4s.requests.searches.SearchResponse | |
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback} | |
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} | |
import org.apache.http.client.config.RequestConfig | |
import org.apache.http.impl.client.BasicCredentialsProvider | |
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder | |
import scala.concurrent.duration.FiniteDuration | |
import java.time.temporal.ChronoField | |
import java.util.concurrent.TimeUnit | |
import scala.util.Properties.{envOrNone, envOrElse} | |
// WARNING : ELASTIC_URL DEFAULT PORT IS 9200 !! (and not 80 or 443) SO BE EXPLICIT | |
val elasticUrl = envOrNone("PHOTOS_ELASTIC_URL").orElse(envOrNone("CEM_ELASTIC_URL")).getOrElse("http://127.0.0.1:9200") | |
val elasticUrlTrust = envOrNone("PHOTOS_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase | |
val elasticUsername = envOrNone("PHOTOS_ELASTIC_USERNAME").orElse(envOrNone("CEM_ELASTIC_USERNAME")) | |
val elasticPassword = envOrNone("PHOTOS_ELASTIC_PASSWORD").orElse(envOrNone("CEM_ELASTIC_PASSWORD")) | |
private val client = { // TODO rewrite to be fully effect based | |
val elasticProperties = ElasticProperties(elasticUrl) | |
val commonRequestConfigBuilder: RequestConfigCallback = (requestConfigBuilder: RequestConfig.Builder) => | |
requestConfigBuilder | |
.setConnectTimeout(10000) | |
.setRedirectsEnabled(true) | |
.setSocketTimeout(10000) | |
if (elasticPassword.isEmpty || elasticUsername.isEmpty) | |
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder)) | |
else { | |
lazy val provider = { | |
val basicProvider = new BasicCredentialsProvider | |
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get) | |
basicProvider.setCredentials(AuthScope.ANY, credentials) | |
basicProvider | |
} | |
import org.apache.http.ssl.SSLContexts | |
import org.apache.http.conn.ssl.TrustSelfSignedStrategy | |
val sslContext = elasticUrlTrust match { | |
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build() | |
case _ => SSLContexts.createDefault() | |
} | |
val httpClientConfigCallback: HttpClientConfigCallback = | |
(httpClientBuilder: HttpAsyncClientBuilder) => | |
httpClientBuilder | |
.setDefaultCredentialsProvider(provider) | |
.setSSLContext(sslContext) | |
// .setSSLHostnameVerifier(org.apache.http.conn.ssl.NoopHostnameVerifier.INSTANCE) | |
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder, httpClientConfigCallback)) | |
} | |
} | |
private val scrollKeepAlive = FiniteDuration(30, "seconds") | |
private val timeout = 20.seconds | |
private val retrySchedule = (Schedule.exponential(500.millis, 2).jittered && Schedule.recurs(5)).onDecision((state, out, decision) => | |
decision match { | |
case Schedule.Decision.Done => ZIO.logInfo("No more retry attempt !") | |
case Schedule.Decision.Continue(interval) => ZIO.logInfo(s"Will retry at ${interval.start}") | |
} | |
) | |
val upsertGrouping = 50 | |
val searchPageSize = 500 | |
// ------------------------------------------------------ | |
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = { | |
val year = timestamp.get(ChronoField.YEAR) | |
val month = timestamp.get(ChronoField.MONTH_OF_YEAR) | |
val day = timestamp.get(ChronoField.DAY_OF_MONTH) | |
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR) | |
s"$indexPrefix-$year-$month" | |
} | |
// ------------------------------------------------------ | |
private def streamFromScroll(scrollId: String) = { | |
ZStream.paginateChunkZIO(scrollId) { currentScrollId => | |
for { | |
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive)) | |
nextScrollId = response.result.scrollId | |
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${results.size} more documents") | |
} yield results -> (if (results.size > 0) nextScrollId else None) | |
} | |
} | |
def fetchAll[T](indexName: String)(implicit decoder: JsonDecoder[T]) = { | |
// TODO something is going wrong here, sometimes not all results are returned without error being returned | |
// TODO deep pagination issue see https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html | |
val result = for { | |
response <- client.execute(search(Index(indexName)).size(searchPageSize).scroll(scrollKeepAlive)) | |
scrollId <- ZIO.fromOption(response.result.scrollId) | |
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${firstResults.size} first documents") | |
nextResultsStream = streamFromScroll(scrollId) | |
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream | |
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString)) | |
} | |
// ------------------------------------------------------ | |
def upsert[T](indexPrefix: String, documents: Chunk[T])(timestampExtractor: T => OffsetDateTime, idExtractor: T => String)(implicit encoder: JsonEncoder[T]) = { | |
val responseEffect = client.execute { | |
bulk { | |
for { document <- documents } yield { | |
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document)) | |
val id = idExtractor(document) | |
indexInto(indexName).id(id).doc(document) | |
} | |
} | |
} | |
val upsertEffect = for { | |
response <- responseEffect | |
failures = response.result.failures.flatMap(_.error).map(_.toString) | |
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch") | |
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n")) | |
} yield () | |
upsertEffect.timeout(timeout).retry(retrySchedule) | |
} | |
} | |
// ===================================================================================================================== | |
object Photos extends ZIOAppDefault { | |
import zio.lmdb.* | |
case class GeoPoint(lat: Double, lon: Double) derives JsonCodec | |
case class Photo( | |
uuid: UUID, | |
timestamp: OffsetDateTime, | |
filePath: String, | |
fileSize: Long, | |
fileHash: String, | |
fileLastUpdated: OffsetDateTime, | |
category: Option[String], | |
shootDateTime: Option[OffsetDateTime], | |
camera: Option[String], | |
tags: Map[String, String], | |
keywords: List[String], // Extracted from category | |
classifications: List[String], // Extracted from AI DJL | |
detectedObjects: List[String], // Extracted from AI DJL | |
place: Option[GeoPoint] | |
) derives JsonCodec | |
val databaseName = "memories" | |
val collectionName = "photos" | |
def run = logic.provide(LMDB.liveWithDatabaseName(databaseName), Scope.default) | |
val logic = for { | |
started <- Clock.instant | |
elasticPhotosStream = ElasticOps.fetchAll[Photo]("photos-*") | |
_ <- LMDB.collectionCreate[Photo](collectionName).ignore | |
lmdbPhotos <- LMDB.collectionGet[Photo](collectionName) | |
_ <- elasticPhotosStream | |
.tap(photo => Console.printLine(s"Updating ${photo.filePath} : ${photo.timestamp}")) | |
.foreach(photo => lmdbPhotos.upsertOverwrite(photo.uuid.toString, photo)) | |
.tapError(err => Console.printLine(err)) | |
finished <- Clock.instant | |
duration = finished.getEpochSecond - started.getEpochSecond | |
_ <- Console.printLine("""LMDB standard tools can be used to manage the database content : sudo apt-get install lmdb-utils""") | |
lmdb <- ZIO.service[LMDB] | |
_ <- Console.printLine(s"""To get some statistics : mdb_stat -s $collectionName ${lmdb.databasePath}/""") | |
_ <- Console.printLine(s"""To dump collection content : mdb_dump -p -s $collectionName ${lmdb.databasePath}/""") // -p to make output printable and easily editable (text) | |
_ <- Console.printLine(s"Run operations done in $duration seconds") | |
} yield () | |
} | |
Photos.main(Array.empty) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment