Created
September 10, 2017 10:25
-
-
Save timothyklim/7f42083a0df24413eb14d99ff8c3e242 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ImageProcessor extends Logging { | |
final case class ImageProcessorException(val message: String) extends Throwable(message) with NoStackTrace | |
def processAndUpload(srcs: Vector[String], | |
prefix: String, | |
watermarks: Set[Watermark], | |
defaultPhotoFormats: Vector[Int] = Image.defaultFormats)( | |
implicit sys: ActorSystem, | |
mat: Materializer): Future[Vector[Image]] = { | |
import sys.dispatcher | |
Source(srcs) | |
.mapConcat { src => | |
if (watermarks.isEmpty) Vector((src, Option.empty[Watermark], true)) | |
else watermarks.zipWithIndex.map { case (wm, idx) => (src, Some(wm), idx == 0) } | |
} | |
.mapAsync(2) { | |
case (src, watermark, uploadOriginal) => | |
val photoFormats = watermark.map(_.photoFormats).getOrElse(defaultPhotoFormats) | |
for { | |
(file, _) <- FileProcessor.downloadAsFile(src) | |
image <- process(file, prefix, photoFormats, watermark) | |
_ <- if (uploadOriginal) { | |
for { | |
_ <- Future.sequence( | |
Seq( | |
Storage.backupOriginalImage(image.id, file), | |
Storage.uploadImage(s"/originals/${image.id}", file) | |
)) | |
_ <- FileProcessor.deleteFile(file) | |
} yield () | |
} else Future.unit | |
} yield image | |
} | |
.runWith(Sink.seq) | |
.map(_.distinct.toVector) | |
} | |
// @return (width, height) | |
def getSize(file: File)(implicit ec: ExecutionContext): Future[(Int, Int)] = | |
Future(blocking { | |
run(identify(file)) | |
.flatMap { output => | |
output.trim match { | |
case widthHeightRegex(w, h) => | |
val (width, height) = (w.toInt, h.toInt) | |
if (width > 0 && height > 0) Right((width, height)) | |
else Left(s"Width and height must be greater than zero: ${width}x$height") | |
case _ => Left(s"Unknown format output: $output") | |
} | |
} | |
.valueOr(msg => throw ImageProcessorException(msg)) | |
}) | |
private val widthHeightRegex = """.*\s+(\d+)x(\d+)\s+(?s).*""".r | |
private def retinaFormats(formats: Vector[Int]): Vector[Int] = | |
(formats ++ formats.map(_ * 2)).distinct | |
private def processImage(file: File, size: Int, watermark: Option[Watermark])( | |
implicit sys: ActorSystem): Future[File] = { | |
implicit val ec = sys.dispatchers.lookup(dispatcherName) | |
val outfile = tempImageFile() | |
val outfileOptimized = tempImageFile(s"-$size") | |
Future(blocking(for { | |
_ <- run(thumbnail(size, file, outfile)) | |
_ <- watermark match { | |
case Some(wm) if wm.aspect > 0 => run(composite(size, wm, outfile)) | |
case _ => Right(()) | |
} | |
_ <- run(optimize(outfile, outfileOptimized)) | |
_ <- Try(outfile.delete()).toEither.leftMap(_.getMessage) | |
} yield outfileOptimized)).flatMap { | |
case Right(file) => Future.successful(file) | |
case Left(e) => | |
Try(outfileOptimized.delete()) | |
Future.failed(ImageProcessorException(e)) | |
} | |
} | |
private def thumbnail(size: Int, in: File, out: File): Seq[String] = | |
Seq("vipsthumbnail", "--vips-concurrency=4", in.getPath, "-o", s"${out.getPath}[Q=100]", "--size", s"x$size") | |
private def composite(size: Int, watermark: Watermark, in: File): Seq[String] = { | |
val filePath = in.getPath | |
val watermarkPath = s"${settings.images.watermarksPath}/${watermark.name}.png" | |
val wmRatio = (size * watermark.aspect).toInt | |
Seq("composite", "-gravity", "center", "-geometry", s"${wmRatio}x${wmRatio}-0+0", watermarkPath, filePath, filePath) | |
} | |
private def optimize(in: File, out: File, quality: Int = 85): Seq[String] = | |
Seq("vips", | |
"--vips-concurrency=4", | |
"jpegsave", | |
in.getPath, | |
out.getPath, | |
s"--Q=$quality", | |
"--no-subsample", | |
"--strip", | |
"--interlace", | |
"--optimize-coding") | |
private def identify(in: File): Seq[String] = | |
Seq("vipsheader", "-a", in.getPath) | |
private def run(cmd: Seq[String]): Either[String, String] = { | |
val command = cmd.mkString(" ") | |
logger.debug(s"Run processing command: $command") | |
val stderr = mutable.ListBuffer[String]() | |
val stdout = mutable.ListBuffer[String]() | |
val errorLogger = ProcessLogger(stdout.+=, stderr.+=) | |
val exitCode = cmd.!(errorLogger) | |
logger.debug(s"Output for command $command: $stdout") | |
if (exitCode == 0) Right(stdout.mkString("\n")) else Left(stderr.mkString("\n")) | |
} | |
private def tempImageFile(postfix: String = ""): File = { | |
val randomId = Random.random.alphanumeric.take(15).mkString | |
val file = new File(s"/tmp/$randomId$postfix.jpg") | |
file.deleteOnExit() | |
file | |
} | |
private def processImageThumbnails(originalFile: File, imageRules: Vector[Int], watermark: Option[Watermark])( | |
implicit sys: ActorSystem): Source[File, NotUsed] = { | |
import sys.dispatcher | |
Source | |
.fromFuture(Future(blocking { | |
if (originalFile.length == 0) { | |
Try(originalFile.delete()) | |
throw ImageProcessorException(s"File `${originalFile.getPath()}` is empty") | |
} else originalFile | |
})) | |
.mapConcat { f => | |
val rules = | |
if (watermark.contains(Watermark.Presentation)) imageRules | |
else retinaFormats(imageRules) | |
rules.map(r => (f, r, watermark)) | |
} | |
.mapAsyncUnordered(4)((processImage _).tupled) | |
.recover { | |
case e => | |
logger.error(e.toString, e) | |
throw e | |
} | |
} | |
def process(file: File, prefix: String, photoFormats: Vector[Int], watermark: Option[Watermark])( | |
implicit sys: ActorSystem, | |
mat: Materializer) = { | |
import sys.dispatcher | |
for { | |
(width, height) <- getSize(file) | |
thumbnails = processImageThumbnails(file, photoFormats, watermark) | |
(imageId, files) <- Storage.uploadThumbnails(prefix, file, thumbnails, watermark) | |
_ <- FileProcessor.deleteFiles(files) | |
} yield Image(imageId, width = Some(width), height = Some(height), None, None) | |
} | |
private val dispatcherName = "images-processor-io-dispatcher" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment