Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active August 20, 2023 20:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/46718666ae96ebac300b27c80ed7bec3 to your computer and use it in GitHub Desktop.
Save dacr/46718666ae96ebac300b27c80ed7bec3 to your computer and use it in GitHub Desktop.
Photos indexation proof of concept using ZIO, opensearch and deep-java-learning - When you have ~89000 photos/videos tooling is mandatory / published by https://github.com/dacr/code-examples-manager #131e1312-4b2b-48c4-a60a-e17bfa73754b/1f636074b46a0621a5ddbaacc0741e66c76f9321
// summary : Photos indexation proof of concept using ZIO, opensearch and deep-java-learning - When you have ~89000 photos/videos tooling is mandatory
// keywords : scala, photos, memories, zio, ziostream, poc, djl, machine-learning, elasticsearch
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 131e1312-4b2b-48c4-a60a-e17bfa73754b
// created-on : 2021-12-19T09:29:59+01:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
/*
Just run it once configured. Configuration is achieved using environment variables :
- PHOTOS_SEARCH_IGNORE_MASK=(?:(/Originals/)|(/[.]))
- PHOTOS_SEARCH_INCLUDE_MASK=(?i)[.](?:(jpg)|(png)|(jpeg)|(tiff)|(heic))
- PHOTOS_SEARCH_ROOTS=/place1,/place2
- PHOTOS_ELASTIC_URL=http://127.0.0.1:9200
- PHOTOS_ELASTIC_USERNAME=admin
- PHOTOS_ELASTIC_PASSWORD=admin
*/
/*
What it does in a streamed way :
- search for photo from given search root directories
- extract keywords from the directory name containing each photos
- extract photo meta-data information (exif)
- classify each photo using deep java learning (DJL) classification algorithm
- identify objects included in each photo using DJL dedicated algorithm
- index into elasticsearch all the data extracted from each photo
Issues encountered while designing this proof of concept :
- DJL native libraries not thread-safe, generates JVM SIGSEGV
- data (photos) related issues
- Photo filename using bad char encoding
- Invalid shootDateTime (negative year or not in the right year range [2000,now])
- not using an unified timestamp field which is required for OpenSearch-Dashboard pattern index pattern
- priority to valid shootDateTime and fallback to image file last updated datetime
- Not yet found the right Photo UUID to use
- Using named UUID computed from photo hash was not a good idea as it hides any photo duplicates
- Currently using a filepath only approach for the computed named UUID
to do next :
- extract GPS locations
- extract a third optional timestamp from the filepath
- /home/ALBUMS/2017/2017-08-15 Cote Granite Rose/2Y7A0847.JPG => 2017-08-15
- Face recognition with bounded boxes
- add how many people counter on each photo
- People faces classification and name indexation
*/
// ---------------------
//> using scala "3.3.0"
//> using dep "dev.zio::zio:2.0.15"
//> using dep "dev.zio::zio-streams:2.0.15"
//> using dep "dev.zio::zio-json:0.6.0"
//> using dep "com.drewnoakes:metadata-extractor:2.18.0"
//> using dep "com.fasterxml.uuid:java-uuid-generator:4.2.0"
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.9.1"
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.9.1"
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.9.1"
//----------------------
//> using dep "ai.djl:api:0.23.0"
//> using dep "ai.djl:basicdataset:0.23.0"
//> using dep "ai.djl:model-zoo:0.23.0"
//> using dep "ai.djl.mxnet:mxnet-engine:0.23.0"
//> using dep "ai.djl.mxnet:mxnet-model-zoo:0.23.0"
//> using dep "ai.djl.mxnet:mxnet-native-auto:1.8.0"
//> using dep "net.java.dev.jna:jna:5.13.0"
//----------------------
//> using dep "org.slf4j:slf4j-api:2.0.7"
//> using dep "org.slf4j:slf4j-simple:2.0.7"
// ---------------------
import com.drew.imaging.ImageMetadataReader
import com.drew.metadata.exif.{ExifDirectoryBase, ExifIFD0Directory, ExifSubIFDDirectory}
import com.fasterxml.uuid.Generators
import zio.*
import zio.json.*
import zio.stream.*
import zio.stream.ZPipeline.{splitLines, utf8Decode}
import java.io.{File, IOException}
import java.nio.charset.Charset
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{Files, Path, Paths}
import java.time.{Instant, OffsetDateTime, ZoneId, ZoneOffset, ZonedDateTime}
import java.util.UUID
import scala.util.matching.Regex
import scala.util.{Either, Failure, Left, Properties, Right, Success, Try}
import java.time.format.DateTimeFormatter.ISO_DATE_TIME
import java.time.temporal.ChronoField
import scala.jdk.CollectionConverters.*
import scala.Console.{BLUE, GREEN, RED, RESET, YELLOW}
// =====================================================================================================================
object HashOps {
def sha1(that: String): String =
import java.math.BigInteger
import java.security.MessageDigest
val content = if (that == null) "" else that // TODO - probably discutable, migrate to an effect
val md = MessageDigest.getInstance("SHA-1") // TODO - can fail => potential border side effect !
val digest = md.digest(content.getBytes)
val bigInt = new BigInteger(1, digest)
val hashedString = bigInt.toString(16)
hashedString
def fileDigest(path: Path, algo: String = "SHA-256"): String =
import java.math.BigInteger
import java.security.{MessageDigest, DigestInputStream}
import java.io.FileInputStream
val buffer = new Array[Byte](8192)
val md5 = MessageDigest.getInstance(algo)
val dis = new DigestInputStream(new FileInputStream(path.toFile), md5)
try { while (dis.read(buffer) != -1) {} }
finally { dis.close() }
md5.digest.map("%02x".format(_)).mkString
}
// =====================================================================================================================
object ElasticOps {
import com.sksamuel.elastic4s.zio.instances.*
import com.sksamuel.elastic4s.ziojson.*
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.Index
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.requests.mappings.*
import com.sksamuel.elastic4s.Response
import com.sksamuel.elastic4s.requests.bulk.BulkResponse
import com.sksamuel.elastic4s.requests.searches.SearchResponse
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback}
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import scala.concurrent.duration.FiniteDuration
import java.time.temporal.ChronoField
import java.util.concurrent.TimeUnit
import scala.util.Properties.{envOrNone, envOrElse}
// WARNING : ELASTIC_URL DEFAULT PORT IS 9200 !! (and not 80 or 443) SO BE EXPLICIT
val elasticUrl = envOrNone("PHOTOS_ELASTIC_URL").orElse(envOrNone("CEM_ELASTIC_URL")).getOrElse("http://127.0.0.1:9200")
val elasticUrlTrust = envOrNone("PHOTOS_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase
val elasticUsername = envOrNone("PHOTOS_ELASTIC_USERNAME").orElse(envOrNone("CEM_ELASTIC_USERNAME"))
val elasticPassword = envOrNone("PHOTOS_ELASTIC_PASSWORD").orElse(envOrNone("CEM_ELASTIC_PASSWORD"))
private val client = { // TODO rewrite to be fully effect based
val elasticProperties = ElasticProperties(elasticUrl)
val commonRequestConfigBuilder: RequestConfigCallback = (requestConfigBuilder: RequestConfig.Builder) =>
requestConfigBuilder
.setConnectTimeout(10000)
.setRedirectsEnabled(true)
.setSocketTimeout(10000)
if (elasticPassword.isEmpty || elasticUsername.isEmpty)
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder))
else {
lazy val provider = {
val basicProvider = new BasicCredentialsProvider
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get)
basicProvider.setCredentials(AuthScope.ANY, credentials)
basicProvider
}
import org.apache.http.ssl.SSLContexts
import org.apache.http.conn.ssl.TrustSelfSignedStrategy
val sslContext = elasticUrlTrust match {
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build()
case _ => SSLContexts.createDefault()
}
val httpClientConfigCallback: HttpClientConfigCallback =
(httpClientBuilder: HttpAsyncClientBuilder) =>
httpClientBuilder
.setDefaultCredentialsProvider(provider)
.setSSLContext(sslContext)
// .setSSLHostnameVerifier(org.apache.http.conn.ssl.NoopHostnameVerifier.INSTANCE)
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder, httpClientConfigCallback))
}
}
private val scrollKeepAlive = FiniteDuration(30, "seconds")
private val timeout = 20.seconds
private val retrySchedule = (Schedule.exponential(500.millis, 2).jittered && Schedule.recurs(5)).onDecision((state, out, decision) =>
decision match {
case Schedule.Decision.Done => ZIO.logInfo("No more retry attempt !")
case Schedule.Decision.Continue(interval) => ZIO.logInfo(s"Will retry at ${interval.start}")
}
)
val upsertGrouping = 50
val searchPageSize = 500
// ------------------------------------------------------
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = {
val year = timestamp.get(ChronoField.YEAR)
val month = timestamp.get(ChronoField.MONTH_OF_YEAR)
val day = timestamp.get(ChronoField.DAY_OF_MONTH)
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR)
s"$indexPrefix-$year-$month"
}
// ------------------------------------------------------
private def streamFromScroll(scrollId: String) = {
ZStream.paginateChunkZIO(scrollId) { currentScrollId =>
for {
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive))
nextScrollId = response.result.scrollId
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString))
_ <- ZIO.log(s"Got ${results.size} more documents")
} yield results -> (if (results.size > 0) nextScrollId else None)
}
}
def fetchAll[T](indexName: String)(implicit decoder: JsonDecoder[T]) = {
// TODO something is going wrong here, sometimes not all results are returned without error being returned
// TODO deep pagination issue see https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html
val result = for {
response <- client.execute(search(Index(indexName)).size(searchPageSize).scroll(scrollKeepAlive))
scrollId <- ZIO.fromOption(response.result.scrollId)
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString))
_ <- ZIO.log(s"Got ${firstResults.size} first documents")
nextResultsStream = streamFromScroll(scrollId)
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString))
}
// ------------------------------------------------------
def upsert[T](indexPrefix: String, documents: Chunk[T])(timestampExtractor: T => OffsetDateTime, idExtractor: T => String)(implicit encoder: JsonEncoder[T]) = {
val responseEffect = client.execute {
bulk {
for { document <- documents } yield {
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document))
val id = idExtractor(document)
indexInto(indexName).id(id).doc(document)
}
}
}
val upsertEffect = for {
response <- responseEffect
failures = response.result.failures.flatMap(_.error).map(_.toString)
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch")
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n"))
} yield ()
upsertEffect.timeout(timeout).retry(retrySchedule)
}
}
// =====================================================================================================================
object AIPhotos {
import ai.djl.Application
import ai.djl.engine.Engine
import ai.djl.modality.cv.Image
import ai.djl.modality.cv.ImageFactory
import ai.djl.modality.Classifications
import ai.djl.modality.cv.output.DetectedObjects
import ai.djl.repository.zoo.Criteria
import ai.djl.repository.zoo.ModelZoo
import ai.djl.repository.zoo.ZooModel
import ai.djl.training.util.ProgressBar
import ai.djl.modality.Classifications.Classification
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import scala.jdk.CollectionConverters._
def basename(filename: String): String = {
filename
.split("[/](?=[^/]*$)", 2)
.last
.split("[.]", 2)
.head
}
val objectDetectionsCriteria =
Criteria.builder
.optApplication(Application.CV.OBJECT_DETECTION)
.setTypes(classOf[Image], classOf[DetectedObjects])
.optFilter("backbone", "mobilenet1.0")
.optProgress(new ProgressBar)
.build
val objectDetectionsModel = ModelZoo.loadModel(objectDetectionsCriteria)
val objectDetectionPredictor = objectDetectionsModel.newPredictor()
def detectedObjects(path: Path): List[String] = {
val loadedImage: Image = ImageFactory.getInstance().fromFile(path)
val detection: DetectedObjects = objectDetectionPredictor.predict(loadedImage)
val detected: List[String] =
detection
.items()
.iterator()
.asScala
.toList
.asInstanceOf[List[Classifications.Classification]]
.filter(_.getProbability >= 0.5d)
.map(_.getClassName())
.flatMap(_.split(""",\s+"""))
.distinct
detected
}
val imageClassificationCriteria =
Criteria.builder
.optApplication(Application.CV.IMAGE_CLASSIFICATION)
.setTypes(classOf[Image], classOf[Classifications])
// ------------------------------------
// .optFilter("flavor","v1")
// .optFilter("dataset","cifar10")
// ------------------------------------
// .optFilter("flavor","v3_large")
// .optFilter("dataset","imagenet")
// ------------------------------------
.optFilter("flavor", "v1d")
.optFilter("dataset", "imagenet")
.optProgress(new ProgressBar)
.build
val imageClassificationModel = ModelZoo.loadModel(imageClassificationCriteria)
val imageClassificationPredictor = imageClassificationModel.newPredictor()
def cleanupClassName(input: String): String =
input.replaceAll("""^n\d+ """, "")
def classifyImage(path: Path): List[String] = {
val img = ImageFactory.getInstance().fromFile(path)
val found: Classifications = imageClassificationPredictor.predict(img)
found
.items()
.asScala
.toList
.asInstanceOf[List[Classification]]
.filter(_.getProbability > 0.5d)
.map(_.getClassName)
.map(cleanupClassName)
.flatMap(_.split(""",\s+"""))
.distinct
}
}
// =====================================================================================================================
object Photos extends ZIOAppDefault {
val generatorPUUID = Generators.nameBasedGenerator()
/* Attempt to generate a unique photo identifier */
def makePUUID(camera: Option[String], shootDateTime: Option[Instant], filePath: Path, fileHash: String): UUID = {
// generatorPUUID.generate(filePath.getFileName().toString + shootDateTime.map(_.toString).getOrElse(""))
generatorPUUID.generate(filePath.toString)
}
case class GeoPoint(lat: Double, lon: Double) derives JsonCodec
case class Photo(
uuid: UUID,
timestamp: OffsetDateTime,
filePath: Path,
fileSize: Long,
fileHash: String,
fileLastUpdated: OffsetDateTime,
category: Option[String],
shootDateTime: Option[OffsetDateTime],
camera: Option[String],
tags: Map[String, String],
keywords: List[String],
classifications: List[String],
detectedObjects: List[String],
place: Option[GeoPoint]
) derives JsonCodec
object Photo {
implicit val pathEncoder: JsonEncoder[Path] = JsonEncoder[String].contramap(p => p.toString)
implicit val pathDecoder: JsonDecoder[Path] = JsonDecoder[String].map(p => Path.of(p))
def makeTagKey(tag: com.drew.metadata.Tag): String = {
val prefix = tag.getDirectoryName().trim.replaceAll("""\s+""", "")
val name = tag.getTagName().trim.replaceAll("""\s+""", "")
val key = s"$prefix$name"
key.head.toLower + key.tail
}
def tagsToMap(tags: List[com.drew.metadata.Tag]): Map[String, String] = {
tags
.filterNot(_.getDescription == null)
.map(tag => makeTagKey(tag) -> tag.getDescription)
.toMap
}
def now = OffsetDateTime.now() // TODO : migrate to ZIO Clock.now
def checkTimestampValid(ts: OffsetDateTime) = ts.get(ChronoField.YEAR) >= 2000 & ts.isBefore(now)
def computeTimestamp(mayBeShootDateTime: Option[OffsetDateTime], fileLastUpdated: OffsetDateTime): OffsetDateTime =
mayBeShootDateTime match
case Some(shootDateTime) if checkTimestampValid(shootDateTime) => shootDateTime
case _ => fileLastUpdated
def makePhoto(
uuid: UUID,
filePath: Path,
fileSize: Long,
fileHash: String,
fileLastUpdated: Instant,
category: Option[String],
shootDateTime: Option[Instant],
camera: Option[String],
metaDataTags: List[com.drew.metadata.Tag],
keywords: List[String], // Extracted from category
classifications: List[String], // Extracted from AI DJL
detectedObjects: List[String] // Extracted from AI DJL
): Photo = {
val shootOffsetDateTime = shootDateTime.map(_.atOffset(ZoneOffset.UTC))
val fileLastUpdatedOffsetDateTime = fileLastUpdated.atOffset(ZoneOffset.UTC)
val tags = tagsToMap(metaDataTags)
Photo(
uuid = uuid,
timestamp = computeTimestamp(shootOffsetDateTime, fileLastUpdatedOffsetDateTime),
filePath = filePath,
fileSize = fileSize,
fileHash = fileHash,
fileLastUpdated = fileLastUpdatedOffsetDateTime,
category = category,
shootDateTime = shootOffsetDateTime,
camera = camera,
tags = tags,
keywords = keywords,
classifications = classifications,
detectedObjects = detectedObjects,
place = computeGeoPoint(tags)
)
}
}
case class PhotoFileIssue(filepath: Path, throwable: Throwable)
case class PhotoFileContentIssue(filepath: Path, throwable: Throwable)
case class PhotoMetadataIssue(filepath: Path, throwable: Throwable)
case class PhotoUUIDIssue(filepath: Path, throwable: Throwable)
case class PhotoAIIssue(filepath: Path, throwable: Throwable)
type PhotoIssue = PhotoFileContentIssue | PhotoMetadataIssue | PhotoUUIDIssue | PhotoFileIssue | PhotoAIIssue
def categoryFromFilepath(filePath: Path, searchPath: Path): Option[String] =
Option(filePath.getParent)
.map(parent => searchPath.relativize(parent))
.map(_.toString)
def camelTokenize(that: String): Array[String] = that.split("(?=[A-Z][^A-Z])|(?:(?<=[^A-Z])(?=[A-Z]+))")
def camelToKebabCase(that: String): String = camelTokenize(that).map(_.toLowerCase).mkString("-")
val excludes = Set("et", "par", "le", "la", "de", "du", "au", "aux", "pour", "à", "a", "les", "des", "avec", "du", "dans", "sur", "d")
def extractKeywords(input: Option[String]): List[String] =
input match {
case None => Nil
case Some(category) =>
category
.split("[- /]+")
.toList
.filter(_.trim.size > 0)
.filterNot(_.matches("^[0-9]+$"))
.filterNot(_.contains("'"))
.flatMap(key => camelToKebabCase(key).split("-"))
.map(_.toLowerCase)
.filter(_.size > 1)
.filterNot(key => excludes.contains(key))
}
/*
tags.gPSGPSLatitude : 45° 19' 19,29"
tags.gPSGPSLatitudeRef : N
tags.gPSGPSLongitude : 6° 32' 39,47"
tags.gPSGPSLongitudeRef : E
*/
val dmsRE = """[-+]?(\d+)[°]\s*(\d+)['′]\s*(\d+(?:[.,]\d+)?)(?:(?:")|(?:'')|(?:′′)|(?:″))""".r
def convert(d: Double, m: Double, s: Double): Double = d + m / 60d + s / 3600d
def degreesMinuteSecondsToDecimalDegrees(
dms: String,
ref: String
): Try[Double] = Try {
val dd = dms.trim match {
case dmsRE(d, m, s) => convert(d.toDouble, m.toDouble, s.replaceAll(",", ".").toDouble)
}
if ("NE".contains(ref.trim.toUpperCase.head)) dd else -dd
}
def computeGeoPoint(photoTags: Map[String, String]): Option[GeoPoint] =
// Degrees Minutes Seconds to Decimal Degrees
for {
latitude <- photoTags.get("gPSGPSLatitude")
latitudeRef <- photoTags.get("gPSGPSLatitudeRef")
longitude <- photoTags.get("gPSGPSLongitude")
longitudeRef <- photoTags.get("gPSGPSLongitudeRef")
lat <- degreesMinuteSecondsToDecimalDegrees(latitude, latitudeRef).toOption // TODO enhance error processing
lon <- degreesMinuteSecondsToDecimalDegrees(longitude, longitudeRef).toOption // TODO enhance error processing
} yield GeoPoint(lat, lon)
def makePhoto(searchPath: Path, filePath: Path) =
for
metadataEither <- ZIO
.attemptBlockingIO(ImageMetadataReader.readMetadata(filePath.toFile))
.tapError(th => ZIO.logWarning(s"readMetadata issue with $filePath : ${th.getMessage}"))
.either
exifSubIFD = metadataEither.toOption.flatMap(metaData => Option(metaData.getFirstDirectoryOfType(classOf[ExifSubIFDDirectory])))
exifIFD0 = metadataEither.toOption.flatMap(metaData => Option(metaData.getFirstDirectoryOfType(classOf[ExifIFD0Directory])))
shootDateTime = exifIFD0.flatMap(dir => Option(dir.getDate(ExifDirectoryBase.TAG_DATETIME))).map(_.toInstant)
camera = exifIFD0.flatMap(dir => Option(dir.getString(ExifDirectoryBase.TAG_MODEL)))
fileSize <- ZIO.attemptBlockingIO(filePath.toFile.length()).mapError(th => PhotoFileIssue(filePath, th))
fileLastUpdated <- ZIO.attemptBlockingIO(filePath.toFile.lastModified()).mapAttempt(Instant.ofEpochMilli).mapError(th => PhotoFileIssue(filePath, th))
fileHash <- ZIO.attemptBlockingIO(HashOps.fileDigest(filePath)).mapError(th => PhotoFileContentIssue(filePath, th))
category = categoryFromFilepath(filePath, searchPath)
metaDirectories = metadataEither.map(_.getDirectories.asScala).getOrElse(Nil)
metaDataTags = metaDirectories.flatMap(dir => dir.getTags.asScala).toList
puuid <- ZIO.attemptBlockingIO(makePUUID(camera, shootDateTime, filePath, fileHash)).mapError(th => PhotoUUIDIssue(filePath, th))
classificationsEither <- ZIO
.attemptBlockingIO(AIPhotos.classifyImage(filePath))
.tapError(th => ZIO.logWarning(s"classifyImage issue with $filePath : ${th.getMessage}"))
.either
detectedObjectsEither <- ZIO
.attemptBlockingIO(AIPhotos.detectedObjects(filePath))
.tapError(th => ZIO.logWarning(s"detectedImage issue with $filePath : ${th.getMessage}"))
.either
classifications = classificationsEither.toOption.getOrElse(Nil)
detectedObjects = detectedObjectsEither.toOption.getOrElse(Nil)
keywords = extractKeywords(category)
_ <- Console.printLine(
s"$filePath - $RED${camera.getOrElse("")}$RESET - $YELLOW${keywords.mkString(",")}$RESET - $GREEN${classifications.mkString(",")}$RESET - $BLUE${detectedObjects
.mkString(",")}$RESET - $RED$shootDateTime$RESET - $fileLastUpdated"
)
photo = Photo.makePhoto(
uuid = puuid,
filePath = filePath,
fileSize = fileSize,
fileHash = fileHash,
fileLastUpdated = fileLastUpdated,
category = category,
shootDateTime = shootDateTime,
camera = camera,
metaDataTags = metaDataTags,
keywords = keywords,
classifications = classifications,
detectedObjects = detectedObjects
)
// _ <- ZIO.logInfo(s"processed $photo")
yield photo
// -------------------------------------------------------------------------------------------------------------------
def searchPredicate(includeMaskRegex: Option[Regex], ignoreMaskRegex: Option[Regex])(path: Path, attrs: BasicFileAttributes): Boolean = {
attrs.isRegularFile &&
(ignoreMaskRegex.isEmpty || ignoreMaskRegex.get.findFirstIn(path.toString).isEmpty) &&
(includeMaskRegex.isEmpty || includeMaskRegex.get.findFirstIn(path.toString).isDefined)
}
def findFromSearchRoot(
searchRoot: Path,
includeMaskRegex: Option[Regex],
ignoreMaskRegex: Option[Regex]
) = {
val result = for {
searchPath <- ZIO.attempt(searchRoot)
javaStream = Files.find(searchPath, 10, searchPredicate(includeMaskRegex, ignoreMaskRegex))
pathStream = ZStream.fromJavaStream(javaStream).map(path => searchRoot -> path)
} yield pathStream
ZStream.unwrap(result)
}
def fetch() = {
val result = for {
_ <- ZIO.logInfo("photos inventory")
searchRoots <- System
.env("PHOTOS_SEARCH_ROOTS")
.someOrFail("nowhere to search")
.map(_.split("[,;]").toList.map(_.trim))
includeMask <- System.env("PHOTOS_SEARCH_INCLUDE_MASK")
includeMaskRegex <- ZIO.attempt(includeMask.map(_.r))
ignoreMask <- System.env("PHOTOS_SEARCH_IGNORE_MASK")
ignoreMaskRegex <- ZIO.attempt(ignoreMask.map(_.r))
searchRootsStreams = Chunk.fromIterable(searchRoots).map(searchRoot => findFromSearchRoot(Path.of(searchRoot), includeMaskRegex, ignoreMaskRegex))
zCandidatesStream = ZStream.concatAll(searchRootsStreams)
} yield zCandidatesStream
ZStream.unwrap(result)
}
def run = for {
started <- Clock.instant
_ <- Console.printLine(s"${GREEN}Synchronizing photos database$RESET")
alreadyPublished <- ElasticOps.fetchAll[Photo]("photos-*").runCollect
_ <- Console.printLine(s"$YELLOW${alreadyPublished.size} photos already published$RESET")
excludePaths = alreadyPublished.map(_.filePath.normalize().toString).toSet
results <- fetch() // AI DJL External native libraries are not thread safe :(
.filterNot((searchPath, path) => excludePaths.contains(path.normalize().toString))
.mapZIOParUnordered(1)((searchPath, path) => makePhoto(searchPath, path))
.grouped(ElasticOps.upsertGrouping)
.mapZIOParUnordered(1)(group =>
ElasticOps.upsert[Photo]("photos", group)(
timestampExtractor = photo => photo.timestamp,
idExtractor = photo => photo.uuid.toString
)
)
.runDrain
.tapError(err => Console.printLine(err))
finished <- Clock.instant
duration = finished.getEpochSecond - started.getEpochSecond
_ <- Console.printLine(s"${GREEN}Synchronize operations done in $duration seconds$RESET")
} yield ()
}
Photos.main(Array.empty)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment