Last active
April 30, 2023 20:49
-
-
Save dacr/6d7ba9b207013d8138deda28e824490a to your computer and use it in GitHub Desktop.
Photos manager - update photo document field within elasticsearch / published by https://github.com/dacr/code-examples-manager #7cf62cfb-9643-4af3-afa4-54deaedf4140/3370b4d7b3b48077e05d643ca6b7b4c0761f9f03
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Photos manager - update photo document field within elasticsearch | |
// keywords : scala, photos, memories, zio, stream, update, photo, elasticsearch | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 7cf62cfb-9643-4af3-afa4-54deaedf4140 | |
// created-on : 2022-03-08T07:31:17+01:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : scala-cli $file | |
// --------------------- | |
//> using scala "3.2.2" | |
//> using dep "dev.zio::zio:2.0.13" | |
//> using dep "dev.zio::zio-streams:2.0.13" | |
//> using dep "dev.zio::zio-json:0.5.0" | |
//> using dep "com.drewnoakes:metadata-extractor:2.18.0" | |
//> using dep "com.fasterxml.uuid:java-uuid-generator:4.1.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.7.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.7.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.7.0" | |
// --------------------- | |
import zio.* | |
import zio.json.* | |
import zio.stream.* | |
import zio.stream.ZPipeline.{splitLines, utf8Decode} | |
import java.nio.file.Path | |
import java.time.OffsetDateTime | |
import java.util.UUID | |
import scala.util.Try | |
// ===================================================================================================================== | |
object ElasticOps { | |
import com.sksamuel.elastic4s.zio.instances.* | |
import com.sksamuel.elastic4s.ziojson.* | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.Index | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.http.JavaClient | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.requests.mappings.* | |
import com.sksamuel.elastic4s.Response | |
import com.sksamuel.elastic4s.requests.bulk.BulkResponse | |
import com.sksamuel.elastic4s.requests.searches.SearchResponse | |
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback} | |
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} | |
import org.apache.http.client.config.RequestConfig | |
import org.apache.http.impl.client.BasicCredentialsProvider | |
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder | |
import scala.concurrent.duration.FiniteDuration | |
import java.time.temporal.ChronoField | |
import java.util.concurrent.TimeUnit | |
import scala.util.Properties.{envOrNone, envOrElse} | |
val elasticUrl = envOrNone("PHOTOS_ELASTIC_URL").orElse(envOrNone("CEM_ELASTIC_URL")).getOrElse("http://127.0.0.1:9200") | |
val elasticUrlTrust = envOrNone("PHOTOS_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase | |
val elasticUsername = envOrNone("PHOTOS_ELASTIC_USERNAME").orElse(envOrNone("CEM_ELASTIC_USERNAME")) | |
val elasticPassword = envOrNone("PHOTOS_ELASTIC_PASSWORD").orElse(envOrNone("CEM_ELASTIC_PASSWORD")) | |
private val client = { // TODO rewrite to be fully effect based | |
if (elasticPassword.isEmpty || elasticUsername.isEmpty) ElasticClient(JavaClient(ElasticProperties(elasticUrl))) | |
else { | |
lazy val provider = { | |
val basicProvider = new BasicCredentialsProvider | |
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get) | |
basicProvider.setCredentials(AuthScope.ANY, credentials) | |
basicProvider | |
} | |
import org.apache.http.ssl.SSLContexts | |
import org.apache.http.conn.ssl.TrustSelfSignedStrategy | |
val sslContext = elasticUrlTrust match { | |
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build() | |
case _ => SSLContexts.createDefault() | |
} | |
val customElasticClient = ElasticClient( | |
JavaClient( | |
ElasticProperties(elasticUrl), | |
(requestConfigBuilder: RequestConfig.Builder) => requestConfigBuilder, | |
(httpClientBuilder: HttpAsyncClientBuilder) => httpClientBuilder.setDefaultCredentialsProvider(provider).setSSLContext(sslContext) | |
) | |
) | |
customElasticClient | |
} | |
} | |
private val scrollKeepAlive = FiniteDuration(30, "seconds") | |
private val timeout = 20.seconds | |
private val retrySchedule = Schedule.exponential(100.millis, 2).jittered && Schedule.recurs(5) | |
val upsertGrouping = 50 | |
val searchPageSize = 500 | |
// ------------------------------------------------------ | |
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = { | |
val year = timestamp.get(ChronoField.YEAR) | |
val month = timestamp.get(ChronoField.MONTH_OF_YEAR) | |
val day = timestamp.get(ChronoField.DAY_OF_MONTH) | |
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR) | |
s"$indexPrefix-$year-$month" | |
} | |
// ------------------------------------------------------ | |
private def streamFromScroll(scrollId: String) = { | |
ZStream.paginateChunkZIO(scrollId) { currentScrollId => | |
for { | |
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive)) | |
nextScrollId = response.result.scrollId | |
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${results.size} more documents") | |
} yield results -> (if (results.size > 0) nextScrollId else None) | |
} | |
} | |
def fetchAll[T](indexName: String)(implicit decoder: JsonDecoder[T]) = { | |
val result = for { | |
response <- client.execute(search(Index(indexName)).size(searchPageSize).scroll(scrollKeepAlive)) | |
scrollId <- ZIO.fromOption(response.result.scrollId) | |
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${firstResults.size} first documents") | |
nextResultsStream = streamFromScroll(scrollId) | |
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream | |
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString)) | |
} | |
// ------------------------------------------------------ | |
def upsert[T](indexPrefix: String, documents: Chunk[T])(timestampExtractor: T => OffsetDateTime, idExtractor: T => String)(implicit encoder: JsonEncoder[T]) = { | |
val responseEffect = client.execute { | |
bulk { | |
for { document <- documents } yield { | |
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document)) | |
val id = idExtractor(document) | |
indexInto(indexName).id(id).doc(document) | |
} | |
} | |
} | |
val upsertEffect = for { | |
response <- responseEffect | |
failures = response.result.failures.flatMap(_.error).map(_.toString) | |
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch") | |
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n")) | |
} yield () | |
upsertEffect.timeout(timeout).retry(retrySchedule) | |
} | |
} | |
// ===================================================================================================================== | |
object Photos extends ZIOAppDefault { | |
case class GeoPoint(lat: Double, lon: Double) | |
object GeoPoint: | |
implicit val decoder: JsonDecoder[GeoPoint] = DeriveJsonDecoder.gen | |
implicit val encoder: JsonEncoder[GeoPoint] = DeriveJsonEncoder.gen | |
case class Photo( | |
uuid: UUID, | |
timestamp: OffsetDateTime, | |
filePath: Path, | |
fileSize: Long, | |
fileHash: String, | |
fileLastUpdated: OffsetDateTime, | |
category: Option[String], | |
shootDateTime: Option[OffsetDateTime], | |
camera: Option[String], | |
tags: Map[String, String], | |
keywords: List[String], // Extracted from category | |
classifications: List[String], // Extracted from AI DJL | |
detectedObjects: List[String], // Extracted from AI DJL | |
place: Option[GeoPoint] | |
) | |
object Photo { | |
implicit val pathEncoder: JsonEncoder[Path] = JsonEncoder[String].contramap(p => p.toString) | |
implicit val pathDecoder: JsonDecoder[Path] = JsonDecoder[String].map(p => Path.of(p)) | |
implicit val decoder: JsonDecoder[Photo] = DeriveJsonDecoder.gen | |
implicit val encoder: JsonEncoder[Photo] = DeriveJsonEncoder.gen | |
} | |
def fixKeywords(photo: Photo): Photo = | |
val keywords = photo.keywords | |
val fixedKeywords = keywords.map(_.replaceAll(",", "")) | |
if (keywords == fixedKeywords) photo | |
else photo.copy(keywords = fixedKeywords) | |
/* | |
tags.gPSGPSLatitude : 45° 19' 19,29" | |
tags.gPSGPSLatitudeRef : N | |
tags.gPSGPSLongitude : 6° 32' 39,47" | |
tags.gPSGPSLongitudeRef : E | |
*/ | |
val dmsRE = """[-+]?(\d+)[°]\s*(\d+)['′]\s*(\d+(?:[.,]\d+)?)(?:(?:")|(?:'')|(?:′′)|(?:″))""".r | |
def convert(d: Double, m: Double, s: Double): Double = d + m / 60d + s / 3600d | |
def degreesMinuteSecondsToDecimalDegrees( | |
dms: String, | |
ref: String | |
): Try[Double] = Try { | |
val dd = dms.trim match { | |
case dmsRE(d, m, s) => convert(d.toDouble, m.toDouble, s.replaceAll(",", ".").toDouble) | |
} | |
if ("NE".contains(ref.trim.toUpperCase.head)) dd else -dd | |
} | |
def computeGeoPoint(photoTags: Map[String, String]): Option[GeoPoint] = | |
// Degrees Minutes Seconds to Decimal Degrees | |
for { | |
latitude <- photoTags.get("gPSGPSLatitude") | |
latitudeRef <- photoTags.get("gPSGPSLatitudeRef") | |
longitude <- photoTags.get("gPSGPSLongitude") | |
longitudeRef <- photoTags.get("gPSGPSLongitudeRef") | |
lat <- degreesMinuteSecondsToDecimalDegrees(latitude, latitudeRef).toOption // TODO enhance error processing | |
lon <- degreesMinuteSecondsToDecimalDegrees(longitude, longitudeRef).toOption // TODO enhance error processing | |
} yield GeoPoint(lat, lon) | |
def addGeoPoint(photo: Photo): Photo = | |
photo.copy(place = computeGeoPoint(photo.tags)) | |
def run = for { | |
started <- Clock.instant | |
photosStream = ElasticOps.fetchAll[Photo]("photos-*") | |
updatedPhotosStream <- photosStream | |
.map(photo => photo -> photo) | |
.map((original, updated) => original -> fixKeywords(updated)) | |
.map((original, updated) => original -> addGeoPoint(updated)) | |
.collect { case (original, updated) if updated != original => updated } | |
.tap(photo => Console.printLine(s"Updating ${photo.filePath} : ${photo.timestamp}")) | |
.grouped(ElasticOps.upsertGrouping) | |
.foreach { group => | |
ElasticOps.upsert[Photo]( | |
"photos", | |
group)( | |
photo => photo.timestamp, | |
photo => photo.uuid.toString | |
) | |
} | |
.tapError(err => Console.printLine(err)) | |
finished <- Clock.instant | |
duration = finished.getEpochSecond - started.getEpochSecond | |
_ <- Console.printLine(s"Run operations done in $duration seconds") | |
} yield () | |
} | |
Photos.main(Array.empty) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment