Skip to content

Instantly share code, notes, and snippets.

@regis-leray
Last active November 17, 2017 18:49
Show Gist options
  • Save regis-leray/fa42d4885a21376acc7088f0b26ef875 to your computer and use it in GitHub Desktop.
Save regis-leray/fa42d4885a21376acc7088f0b26ef875 to your computer and use it in GitHub Desktop.
WriteChannelSink
// with graph
private object WriteChannelSource {
def apply(channel: WriteChannel): WriteChannelSource = new WriteChannelSource(channel)
}
private class WriteChannelSource(channel: WriteChannel) extends GraphStage[SinkShape[ByteString]] {
val in: Inlet[ByteString] = Inlet.create[ByteString]("WriteChannelSource.in")
val shape: SinkShape[ByteString] = SinkShape.of(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new WriteChannelStageLogic()
private class WriteChannelStageLogic extends GraphStageLogic(shape) with InHandler {
override def onPush(): Unit = {
val data = grab(in)
channel.write(data.toByteBuffer)
pull(in)
}
override def onUpstreamFinish(): Unit = {
super.onUpstreamFinish()
channel.close()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
super.onUpstreamFailure(ex)
channel.close()
}
override def preStart(): Unit = { // initiate the flow of data by issuing a first pull on materialization:
pull(in)
}
setHandler(in, this)
}
}
// Or with DSL flavor
val writeChannel: WritableByteChannel = ???
val sink = Sink.onComplete(_ => writeChannel.close()).mapMaterializedValue(_ => Future.successful(Done))
Flow.fromFunction[ByteString, Unit](data => writeChannel.write(data.toByteBuffer))
.toMat(sink)(Keep.right)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment