Skip to content

Instantly share code, notes, and snippets.

@timothyklim
Created September 10, 2017 10:25
Show Gist options
  • Save timothyklim/7f42083a0df24413eb14d99ff8c3e242 to your computer and use it in GitHub Desktop.
Save timothyklim/7f42083a0df24413eb14d99ff8c3e242 to your computer and use it in GitHub Desktop.
object ImageProcessor extends Logging {
final case class ImageProcessorException(val message: String) extends Throwable(message) with NoStackTrace
def processAndUpload(srcs: Vector[String],
prefix: String,
watermarks: Set[Watermark],
defaultPhotoFormats: Vector[Int] = Image.defaultFormats)(
implicit sys: ActorSystem,
mat: Materializer): Future[Vector[Image]] = {
import sys.dispatcher
Source(srcs)
.mapConcat { src =>
if (watermarks.isEmpty) Vector((src, Option.empty[Watermark], true))
else watermarks.zipWithIndex.map { case (wm, idx) => (src, Some(wm), idx == 0) }
}
.mapAsync(2) {
case (src, watermark, uploadOriginal) =>
val photoFormats = watermark.map(_.photoFormats).getOrElse(defaultPhotoFormats)
for {
(file, _) <- FileProcessor.downloadAsFile(src)
image <- process(file, prefix, photoFormats, watermark)
_ <- if (uploadOriginal) {
for {
_ <- Future.sequence(
Seq(
Storage.backupOriginalImage(image.id, file),
Storage.uploadImage(s"/originals/${image.id}", file)
))
_ <- FileProcessor.deleteFile(file)
} yield ()
} else Future.unit
} yield image
}
.runWith(Sink.seq)
.map(_.distinct.toVector)
}
// @return (width, height)
def getSize(file: File)(implicit ec: ExecutionContext): Future[(Int, Int)] =
Future(blocking {
run(identify(file))
.flatMap { output =>
output.trim match {
case widthHeightRegex(w, h) =>
val (width, height) = (w.toInt, h.toInt)
if (width > 0 && height > 0) Right((width, height))
else Left(s"Width and height must be greater than zero: ${width}x$height")
case _ => Left(s"Unknown format output: $output")
}
}
.valueOr(msg => throw ImageProcessorException(msg))
})
private val widthHeightRegex = """.*\s+(\d+)x(\d+)\s+(?s).*""".r
private def retinaFormats(formats: Vector[Int]): Vector[Int] =
(formats ++ formats.map(_ * 2)).distinct
private def processImage(file: File, size: Int, watermark: Option[Watermark])(
implicit sys: ActorSystem): Future[File] = {
implicit val ec = sys.dispatchers.lookup(dispatcherName)
val outfile = tempImageFile()
val outfileOptimized = tempImageFile(s"-$size")
Future(blocking(for {
_ <- run(thumbnail(size, file, outfile))
_ <- watermark match {
case Some(wm) if wm.aspect > 0 => run(composite(size, wm, outfile))
case _ => Right(())
}
_ <- run(optimize(outfile, outfileOptimized))
_ <- Try(outfile.delete()).toEither.leftMap(_.getMessage)
} yield outfileOptimized)).flatMap {
case Right(file) => Future.successful(file)
case Left(e) =>
Try(outfileOptimized.delete())
Future.failed(ImageProcessorException(e))
}
}
private def thumbnail(size: Int, in: File, out: File): Seq[String] =
Seq("vipsthumbnail", "--vips-concurrency=4", in.getPath, "-o", s"${out.getPath}[Q=100]", "--size", s"x$size")
private def composite(size: Int, watermark: Watermark, in: File): Seq[String] = {
val filePath = in.getPath
val watermarkPath = s"${settings.images.watermarksPath}/${watermark.name}.png"
val wmRatio = (size * watermark.aspect).toInt
Seq("composite", "-gravity", "center", "-geometry", s"${wmRatio}x${wmRatio}-0+0", watermarkPath, filePath, filePath)
}
private def optimize(in: File, out: File, quality: Int = 85): Seq[String] =
Seq("vips",
"--vips-concurrency=4",
"jpegsave",
in.getPath,
out.getPath,
s"--Q=$quality",
"--no-subsample",
"--strip",
"--interlace",
"--optimize-coding")
private def identify(in: File): Seq[String] =
Seq("vipsheader", "-a", in.getPath)
private def run(cmd: Seq[String]): Either[String, String] = {
val command = cmd.mkString(" ")
logger.debug(s"Run processing command: $command")
val stderr = mutable.ListBuffer[String]()
val stdout = mutable.ListBuffer[String]()
val errorLogger = ProcessLogger(stdout.+=, stderr.+=)
val exitCode = cmd.!(errorLogger)
logger.debug(s"Output for command $command: $stdout")
if (exitCode == 0) Right(stdout.mkString("\n")) else Left(stderr.mkString("\n"))
}
private def tempImageFile(postfix: String = ""): File = {
val randomId = Random.random.alphanumeric.take(15).mkString
val file = new File(s"/tmp/$randomId$postfix.jpg")
file.deleteOnExit()
file
}
private def processImageThumbnails(originalFile: File, imageRules: Vector[Int], watermark: Option[Watermark])(
implicit sys: ActorSystem): Source[File, NotUsed] = {
import sys.dispatcher
Source
.fromFuture(Future(blocking {
if (originalFile.length == 0) {
Try(originalFile.delete())
throw ImageProcessorException(s"File `${originalFile.getPath()}` is empty")
} else originalFile
}))
.mapConcat { f =>
val rules =
if (watermark.contains(Watermark.Presentation)) imageRules
else retinaFormats(imageRules)
rules.map(r => (f, r, watermark))
}
.mapAsyncUnordered(4)((processImage _).tupled)
.recover {
case e =>
logger.error(e.toString, e)
throw e
}
}
def process(file: File, prefix: String, photoFormats: Vector[Int], watermark: Option[Watermark])(
implicit sys: ActorSystem,
mat: Materializer) = {
import sys.dispatcher
for {
(width, height) <- getSize(file)
thumbnails = processImageThumbnails(file, photoFormats, watermark)
(imageId, files) <- Storage.uploadThumbnails(prefix, file, thumbnails, watermark)
_ <- FileProcessor.deleteFiles(files)
} yield Image(imageId, width = Some(width), height = Some(height), None, None)
}
private val dispatcherName = "images-processor-io-dispatcher"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment