Skip to content

Instantly share code, notes, and snippets.

@Vistritium
Last active July 20, 2024 08:31
Show Gist options
  • Save Vistritium/2c61da8765788c4b1586cba60a125196 to your computer and use it in GitHub Desktop.
Save Vistritium/2c61da8765788c4b1586cba60a125196 to your computer and use it in GitHub Desktop.
package vistritium.utils
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Source, StreamConverters}
import akka.util.ByteString
import java.io.{BufferedInputStream, InputStream}
import scala.concurrent.Await
import scala.concurrent.duration.*
object PipeThroughInputStreamTest {
implicit val as: ActorSystem = ActorSystem()
def pipeThroughInputStream(pipeThrough: InputStream => InputStream)(implicit mat: Materializer): Flow[ByteString, ByteString, NotUsed] = {
val sink = StreamConverters.asInputStream().mapMaterializedValue { materializedInputStream =>
val inputStream = pipeThrough(materializedInputStream)
StreamConverters.fromInputStream(() => inputStream)
}
val (matSource, matSink) = sink.preMaterialize()
Flow.fromSinkAndSourceCoupled(matSink, matSource)
}
def main(args: Array[String]): Unit = {
val input = "test"
val output =
Await.result(
Source
.single(ByteString(input))
.via(pipeThroughInputStream(is => new BufferedInputStream(is)))
.fold(ByteString.empty)(_ ++ _)
.runWith(Sink.head),
10.seconds
)
println(s"input: ${input}, output: ${output.utf8String}")
require(input == output.utf8String)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment