Last active
May 25, 2024 10:20
-
-
Save dacr/6d7ba9b207013d8138deda28e824490a to your computer and use it in GitHub Desktop.
Photos manager - update photo document field within elasticsearch / published by https://github.com/dacr/code-examples-manager #7cf62cfb-9643-4af3-afa4-54deaedf4140/15a56f4ffd30f6d69de25e0b24bc61e55656ff06
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Photos manager - update photo document field within elasticsearch | |
// keywords : scala, photos, memories, zio, stream, update, photo, elasticsearch | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 7cf62cfb-9643-4af3-afa4-54deaedf4140 | |
// created-on : 2022-03-08T07:31:17+01:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : scala-cli $file | |
// --------------------- | |
//> using scala "3.4.2" | |
//> using dep "dev.zio::zio:2.0.13" | |
//> using dep "dev.zio::zio-streams:2.0.13" | |
//> using dep "dev.zio::zio-json:0.5.0" | |
//> using dep "com.drewnoakes:metadata-extractor:2.18.0" | |
//> using dep "com.fasterxml.uuid:java-uuid-generator:4.1.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.7.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.7.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.7.0" | |
// --------------------- | |
import zio.* | |
import zio.json.* | |
import zio.stream.* | |
import zio.stream.ZPipeline.{splitLines, utf8Decode} | |
import java.nio.file.Path | |
import java.time.OffsetDateTime | |
import java.util.UUID | |
import scala.util.Try | |
// ===================================================================================================================== | |
object ElasticOps { | |
import com.sksamuel.elastic4s.zio.instances.* | |
import com.sksamuel.elastic4s.ziojson.* | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.Index | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.http.JavaClient | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.requests.mappings.* | |
import com.sksamuel.elastic4s.Response | |
import com.sksamuel.elastic4s.requests.bulk.BulkResponse | |
import com.sksamuel.elastic4s.requests.searches.SearchResponse | |
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback} | |
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} | |
import org.apache.http.client.config.RequestConfig | |
import org.apache.http.impl.client.BasicCredentialsProvider | |
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder | |
import scala.concurrent.duration.FiniteDuration | |
import java.time.temporal.ChronoField | |
import java.util.concurrent.TimeUnit | |
import scala.util.Properties.{envOrNone, envOrElse} | |
val elasticUrl = envOrNone("PHOTOS_ELASTIC_URL").orElse(envOrNone("CEM_ELASTIC_URL")).getOrElse("http://127.0.0.1:9200") | |
val elasticUrlTrust = envOrNone("PHOTOS_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase | |
val elasticUsername = envOrNone("PHOTOS_ELASTIC_USERNAME").orElse(envOrNone("CEM_ELASTIC_USERNAME")) | |
val elasticPassword = envOrNone("PHOTOS_ELASTIC_PASSWORD").orElse(envOrNone("CEM_ELASTIC_PASSWORD")) | |
private val client = { // TODO rewrite to be fully effect based | |
if (elasticPassword.isEmpty || elasticUsername.isEmpty) ElasticClient(JavaClient(ElasticProperties(elasticUrl))) | |
else { | |
lazy val provider = { | |
val basicProvider = new BasicCredentialsProvider | |
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get) | |
basicProvider.setCredentials(AuthScope.ANY, credentials) | |
basicProvider | |
} | |
import org.apache.http.ssl.SSLContexts | |
import org.apache.http.conn.ssl.TrustSelfSignedStrategy | |
val sslContext = elasticUrlTrust match { | |
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build() | |
case _ => SSLContexts.createDefault() | |
} | |
val customElasticClient = ElasticClient( | |
JavaClient( | |
ElasticProperties(elasticUrl), | |
(requestConfigBuilder: RequestConfig.Builder) => requestConfigBuilder, | |
(httpClientBuilder: HttpAsyncClientBuilder) => httpClientBuilder.setDefaultCredentialsProvider(provider).setSSLContext(sslContext) | |
) | |
) | |
customElasticClient | |
} | |
} | |
private val scrollKeepAlive = FiniteDuration(30, "seconds") | |
private val timeout = 20.seconds | |
private val retrySchedule = Schedule.exponential(100.millis, 2).jittered && Schedule.recurs(5) | |
val upsertGrouping = 50 | |
val searchPageSize = 500 | |
// ------------------------------------------------------ | |
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = { | |
val year = timestamp.get(ChronoField.YEAR) | |
val month = timestamp.get(ChronoField.MONTH_OF_YEAR) | |
val day = timestamp.get(ChronoField.DAY_OF_MONTH) | |
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR) | |
s"$indexPrefix-$year-$month" | |
} | |
// ------------------------------------------------------ | |
private def streamFromScroll(scrollId: String) = { | |
ZStream.paginateChunkZIO(scrollId) { currentScrollId => | |
for { | |
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive)) | |
nextScrollId = response.result.scrollId | |
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${results.size} more documents") | |
} yield results -> (if (results.size > 0) nextScrollId else None) | |
} | |
} | |
def fetchAll[T](indexName: String)(implicit decoder: JsonDecoder[T]) = { | |
val result = for { | |
response <- client.execute(search(Index(indexName)).size(searchPageSize).scroll(scrollKeepAlive)) | |
scrollId <- ZIO.fromOption(response.result.scrollId) | |
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${firstResults.size} first documents") | |
nextResultsStream = streamFromScroll(scrollId) | |
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream | |
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString)) | |
} | |
// ------------------------------------------------------ | |
def upsert[T](indexPrefix: String, documents: Chunk[T])(timestampExtractor: T => OffsetDateTime, idExtractor: T => String)(implicit encoder: JsonEncoder[T]) = { | |
val responseEffect = client.execute { | |
bulk { | |
for { document <- documents } yield { | |
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document)) | |
val id = idExtractor(document) | |
indexInto(indexName).id(id).doc(document) | |
} | |
} | |
} | |
val upsertEffect = for { | |
response <- responseEffect | |
failures = response.result.failures.flatMap(_.error).map(_.toString) | |
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch") | |
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n")) | |
} yield () | |
upsertEffect.timeout(timeout).retry(retrySchedule) | |
} | |
} | |
// ===================================================================================================================== | |
object Photos extends ZIOAppDefault { | |
case class GeoPoint(lat: Double, lon: Double) | |
object GeoPoint: | |
implicit val decoder: JsonDecoder[GeoPoint] = DeriveJsonDecoder.gen | |
implicit val encoder: JsonEncoder[GeoPoint] = DeriveJsonEncoder.gen | |
case class Photo( | |
uuid: UUID, | |
timestamp: OffsetDateTime, | |
filePath: Path, | |
fileSize: Long, | |
fileHash: String, | |
fileLastUpdated: OffsetDateTime, | |
category: Option[String], | |
shootDateTime: Option[OffsetDateTime], | |
camera: Option[String], | |
tags: Map[String, String], | |
keywords: List[String], // Extracted from category | |
classifications: List[String], // Extracted from AI DJL | |
detectedObjects: List[String], // Extracted from AI DJL | |
place: Option[GeoPoint] | |
) | |
object Photo { | |
implicit val pathEncoder: JsonEncoder[Path] = JsonEncoder[String].contramap(p => p.toString) | |
implicit val pathDecoder: JsonDecoder[Path] = JsonDecoder[String].map(p => Path.of(p)) | |
implicit val decoder: JsonDecoder[Photo] = DeriveJsonDecoder.gen | |
implicit val encoder: JsonEncoder[Photo] = DeriveJsonEncoder.gen | |
} | |
def fixKeywords(photo: Photo): Photo = | |
val keywords = photo.keywords | |
val fixedKeywords = keywords.map(_.replaceAll(",", "")) | |
if (keywords == fixedKeywords) photo | |
else photo.copy(keywords = fixedKeywords) | |
/* | |
tags.gPSGPSLatitude : 45° 19' 19,29" | |
tags.gPSGPSLatitudeRef : N | |
tags.gPSGPSLongitude : 6° 32' 39,47" | |
tags.gPSGPSLongitudeRef : E | |
*/ | |
val dmsRE = """[-+]?(\d+)[°]\s*(\d+)['′]\s*(\d+(?:[.,]\d+)?)(?:(?:")|(?:'')|(?:′′)|(?:″))""".r | |
def convert(d: Double, m: Double, s: Double): Double = d + m / 60d + s / 3600d | |
def degreesMinuteSecondsToDecimalDegrees( | |
dms: String, | |
ref: String | |
): Try[Double] = Try { | |
val dd = dms.trim match { | |
case dmsRE(d, m, s) => convert(d.toDouble, m.toDouble, s.replaceAll(",", ".").toDouble) | |
} | |
if ("NE".contains(ref.trim.toUpperCase.head)) dd else -dd | |
} | |
def computeGeoPoint(photoTags: Map[String, String]): Option[GeoPoint] = | |
// Degrees Minutes Seconds to Decimal Degrees | |
for { | |
latitude <- photoTags.get("gPSGPSLatitude") | |
latitudeRef <- photoTags.get("gPSGPSLatitudeRef") | |
longitude <- photoTags.get("gPSGPSLongitude") | |
longitudeRef <- photoTags.get("gPSGPSLongitudeRef") | |
lat <- degreesMinuteSecondsToDecimalDegrees(latitude, latitudeRef).toOption // TODO enhance error processing | |
lon <- degreesMinuteSecondsToDecimalDegrees(longitude, longitudeRef).toOption // TODO enhance error processing | |
} yield GeoPoint(lat, lon) | |
def addGeoPoint(photo: Photo): Photo = | |
photo.copy(place = computeGeoPoint(photo.tags)) | |
def run = for { | |
started <- Clock.instant | |
photosStream = ElasticOps.fetchAll[Photo]("photos-*") | |
updatedPhotosStream <- photosStream | |
.map(photo => photo -> photo) | |
.map((original, updated) => original -> fixKeywords(updated)) | |
.map((original, updated) => original -> addGeoPoint(updated)) | |
.collect { case (original, updated) if updated != original => updated } | |
.tap(photo => Console.printLine(s"Updating ${photo.filePath} : ${photo.timestamp}")) | |
.grouped(ElasticOps.upsertGrouping) | |
.foreach { group => | |
ElasticOps.upsert[Photo]( | |
"photos", | |
group)( | |
photo => photo.timestamp, | |
photo => photo.uuid.toString | |
) | |
} | |
.tapError(err => Console.printLine(err)) | |
finished <- Clock.instant | |
duration = finished.getEpochSecond - started.getEpochSecond | |
_ <- Console.printLine(s"Run operations done in $duration seconds") | |
} yield () | |
} | |
Photos.main(Array.empty) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment