Skip to content

Instantly share code, notes, and snippets.

@Vistritium
Created July 20, 2024 12:29
Show Gist options
  • Save Vistritium/9d0ab78d25b338ffbb851807a423110f to your computer and use it in GitHub Desktop.
Save Vistritium/9d0ab78d25b338ffbb851807a423110f to your computer and use it in GitHub Desktop.
package vistritium.utils
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Source, StreamConverters}
import akka.util.ByteString
import java.io.{FilterInputStream, InputStream}
import scala.concurrent.Await
import scala.concurrent.duration.*
object PipeThroughInputStreamTest {
implicit val as: ActorSystem = ActorSystem()
def pipeThroughInputStream(pipeThrough: InputStream => InputStream)(implicit mat: Materializer): Flow[ByteString, ByteString, NotUsed] = {
val sink = StreamConverters.asInputStream(5.minutes)
val (is, matSink) = sink.preMaterialize()
val source = StreamConverters.fromInputStream(() => pipeThrough(is))
Flow.fromSinkAndSource(matSink, source)
}
def main(args: Array[String]): Unit = {
def removeZerosPipeInputStream(is: InputStream) = new FilterInputStream(is) {
override def read(): Int = {
println("read invoked")
val next = super.read()
if (next == 0) {
super.read()
} else next
}
}
val input = Array(1, 0, 2).map(_.toByte)
val expectedOutput = Array(1, 2).map(_.toByte)
val output =
Await.result(
Source
.single(ByteString(input.map(identity)))
.via(pipeThroughInputStream(removeZerosPipeInputStream))
.fold(ByteString.empty)(_ ++ _)
.runWith(Sink.head),
5.minutes
)
println(s"output: ${output}, expectedOutput: ${ByteString(expectedOutput)}")
require(output.toArray sameElements expectedOutput)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment