Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active April 30, 2023 20:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/6d7ba9b207013d8138deda28e824490a to your computer and use it in GitHub Desktop.
Save dacr/6d7ba9b207013d8138deda28e824490a to your computer and use it in GitHub Desktop.
Photos manager - update photo document field within elasticsearch / published by https://github.com/dacr/code-examples-manager #7cf62cfb-9643-4af3-afa4-54deaedf4140/3370b4d7b3b48077e05d643ca6b7b4c0761f9f03
// summary : Photos manager - update photo document field within elasticsearch
// keywords : scala, photos, memories, zio, stream, update, photo, elasticsearch
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 7cf62cfb-9643-4af3-afa4-54deaedf4140
// created-on : 2022-03-08T07:31:17+01:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// ---------------------
//> using scala "3.2.2"
//> using dep "dev.zio::zio:2.0.13"
//> using dep "dev.zio::zio-streams:2.0.13"
//> using dep "dev.zio::zio-json:0.5.0"
//> using dep "com.drewnoakes:metadata-extractor:2.18.0"
//> using dep "com.fasterxml.uuid:java-uuid-generator:4.1.0"
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.7.0"
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.7.0"
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.7.0"
// ---------------------
import zio.*
import zio.json.*
import zio.stream.*
import zio.stream.ZPipeline.{splitLines, utf8Decode}
import java.nio.file.Path
import java.time.OffsetDateTime
import java.util.UUID
import scala.util.Try
// =====================================================================================================================
object ElasticOps {
import com.sksamuel.elastic4s.zio.instances.*
import com.sksamuel.elastic4s.ziojson.*
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.Index
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.requests.mappings.*
import com.sksamuel.elastic4s.Response
import com.sksamuel.elastic4s.requests.bulk.BulkResponse
import com.sksamuel.elastic4s.requests.searches.SearchResponse
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback}
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import scala.concurrent.duration.FiniteDuration
import java.time.temporal.ChronoField
import java.util.concurrent.TimeUnit
import scala.util.Properties.{envOrNone, envOrElse}
val elasticUrl = envOrNone("PHOTOS_ELASTIC_URL").orElse(envOrNone("CEM_ELASTIC_URL")).getOrElse("http://127.0.0.1:9200")
val elasticUrlTrust = envOrNone("PHOTOS_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase
val elasticUsername = envOrNone("PHOTOS_ELASTIC_USERNAME").orElse(envOrNone("CEM_ELASTIC_USERNAME"))
val elasticPassword = envOrNone("PHOTOS_ELASTIC_PASSWORD").orElse(envOrNone("CEM_ELASTIC_PASSWORD"))
private val client = { // TODO rewrite to be fully effect based
if (elasticPassword.isEmpty || elasticUsername.isEmpty) ElasticClient(JavaClient(ElasticProperties(elasticUrl)))
else {
lazy val provider = {
val basicProvider = new BasicCredentialsProvider
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get)
basicProvider.setCredentials(AuthScope.ANY, credentials)
basicProvider
}
import org.apache.http.ssl.SSLContexts
import org.apache.http.conn.ssl.TrustSelfSignedStrategy
val sslContext = elasticUrlTrust match {
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build()
case _ => SSLContexts.createDefault()
}
val customElasticClient = ElasticClient(
JavaClient(
ElasticProperties(elasticUrl),
(requestConfigBuilder: RequestConfig.Builder) => requestConfigBuilder,
(httpClientBuilder: HttpAsyncClientBuilder) => httpClientBuilder.setDefaultCredentialsProvider(provider).setSSLContext(sslContext)
)
)
customElasticClient
}
}
private val scrollKeepAlive = FiniteDuration(30, "seconds")
private val timeout = 20.seconds
private val retrySchedule = Schedule.exponential(100.millis, 2).jittered && Schedule.recurs(5)
val upsertGrouping = 50
val searchPageSize = 500
// ------------------------------------------------------
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = {
val year = timestamp.get(ChronoField.YEAR)
val month = timestamp.get(ChronoField.MONTH_OF_YEAR)
val day = timestamp.get(ChronoField.DAY_OF_MONTH)
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR)
s"$indexPrefix-$year-$month"
}
// ------------------------------------------------------
private def streamFromScroll(scrollId: String) = {
ZStream.paginateChunkZIO(scrollId) { currentScrollId =>
for {
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive))
nextScrollId = response.result.scrollId
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString))
_ <- ZIO.log(s"Got ${results.size} more documents")
} yield results -> (if (results.size > 0) nextScrollId else None)
}
}
def fetchAll[T](indexName: String)(implicit decoder: JsonDecoder[T]) = {
val result = for {
response <- client.execute(search(Index(indexName)).size(searchPageSize).scroll(scrollKeepAlive))
scrollId <- ZIO.fromOption(response.result.scrollId)
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString))
_ <- ZIO.log(s"Got ${firstResults.size} first documents")
nextResultsStream = streamFromScroll(scrollId)
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString))
}
// ------------------------------------------------------
def upsert[T](indexPrefix: String, documents: Chunk[T])(timestampExtractor: T => OffsetDateTime, idExtractor: T => String)(implicit encoder: JsonEncoder[T]) = {
val responseEffect = client.execute {
bulk {
for { document <- documents } yield {
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document))
val id = idExtractor(document)
indexInto(indexName).id(id).doc(document)
}
}
}
val upsertEffect = for {
response <- responseEffect
failures = response.result.failures.flatMap(_.error).map(_.toString)
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch")
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n"))
} yield ()
upsertEffect.timeout(timeout).retry(retrySchedule)
}
}
// =====================================================================================================================
object Photos extends ZIOAppDefault {
case class GeoPoint(lat: Double, lon: Double)
object GeoPoint:
implicit val decoder: JsonDecoder[GeoPoint] = DeriveJsonDecoder.gen
implicit val encoder: JsonEncoder[GeoPoint] = DeriveJsonEncoder.gen
case class Photo(
uuid: UUID,
timestamp: OffsetDateTime,
filePath: Path,
fileSize: Long,
fileHash: String,
fileLastUpdated: OffsetDateTime,
category: Option[String],
shootDateTime: Option[OffsetDateTime],
camera: Option[String],
tags: Map[String, String],
keywords: List[String], // Extracted from category
classifications: List[String], // Extracted from AI DJL
detectedObjects: List[String], // Extracted from AI DJL
place: Option[GeoPoint]
)
object Photo {
implicit val pathEncoder: JsonEncoder[Path] = JsonEncoder[String].contramap(p => p.toString)
implicit val pathDecoder: JsonDecoder[Path] = JsonDecoder[String].map(p => Path.of(p))
implicit val decoder: JsonDecoder[Photo] = DeriveJsonDecoder.gen
implicit val encoder: JsonEncoder[Photo] = DeriveJsonEncoder.gen
}
def fixKeywords(photo: Photo): Photo =
val keywords = photo.keywords
val fixedKeywords = keywords.map(_.replaceAll(",", ""))
if (keywords == fixedKeywords) photo
else photo.copy(keywords = fixedKeywords)
/*
tags.gPSGPSLatitude : 45° 19' 19,29"
tags.gPSGPSLatitudeRef : N
tags.gPSGPSLongitude : 6° 32' 39,47"
tags.gPSGPSLongitudeRef : E
*/
val dmsRE = """[-+]?(\d+)[°]\s*(\d+)['′]\s*(\d+(?:[.,]\d+)?)(?:(?:")|(?:'')|(?:′′)|(?:″))""".r
def convert(d: Double, m: Double, s: Double): Double = d + m / 60d + s / 3600d
def degreesMinuteSecondsToDecimalDegrees(
dms: String,
ref: String
): Try[Double] = Try {
val dd = dms.trim match {
case dmsRE(d, m, s) => convert(d.toDouble, m.toDouble, s.replaceAll(",", ".").toDouble)
}
if ("NE".contains(ref.trim.toUpperCase.head)) dd else -dd
}
def computeGeoPoint(photoTags: Map[String, String]): Option[GeoPoint] =
// Degrees Minutes Seconds to Decimal Degrees
for {
latitude <- photoTags.get("gPSGPSLatitude")
latitudeRef <- photoTags.get("gPSGPSLatitudeRef")
longitude <- photoTags.get("gPSGPSLongitude")
longitudeRef <- photoTags.get("gPSGPSLongitudeRef")
lat <- degreesMinuteSecondsToDecimalDegrees(latitude, latitudeRef).toOption // TODO enhance error processing
lon <- degreesMinuteSecondsToDecimalDegrees(longitude, longitudeRef).toOption // TODO enhance error processing
} yield GeoPoint(lat, lon)
def addGeoPoint(photo: Photo): Photo =
photo.copy(place = computeGeoPoint(photo.tags))
def run = for {
started <- Clock.instant
photosStream = ElasticOps.fetchAll[Photo]("photos-*")
updatedPhotosStream <- photosStream
.map(photo => photo -> photo)
.map((original, updated) => original -> fixKeywords(updated))
.map((original, updated) => original -> addGeoPoint(updated))
.collect { case (original, updated) if updated != original => updated }
.tap(photo => Console.printLine(s"Updating ${photo.filePath} : ${photo.timestamp}"))
.grouped(ElasticOps.upsertGrouping)
.foreach { group =>
ElasticOps.upsert[Photo](
"photos",
group)(
photo => photo.timestamp,
photo => photo.uuid.toString
)
}
.tapError(err => Console.printLine(err))
finished <- Clock.instant
duration = finished.getEpochSecond - started.getEpochSecond
_ <- Console.printLine(s"Run operations done in $duration seconds")
} yield ()
}
Photos.main(Array.empty)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment