Created
November 17, 2017 18:46
-
-
Save regis-leray/4fffb8ddccb0297600e75d39d9bb2fce to your computer and use it in GitHub Desktop.
WriteChannelSink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private class WriteChannelSource(channel: WriteChannel) extends GraphStage[SinkShape[ByteString]] { | |
val in: Inlet[ByteString] = Inlet.create[ByteString]("WriteChannelSource.in") | |
val shape: SinkShape[ByteString] = SinkShape.of(in) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new WriteChannelStageLogic() | |
private class WriteChannelStageLogic extends GraphStageLogic(shape) with InHandler { | |
override def onPush(): Unit = { | |
val data = grab(in) | |
channel.write(data.toByteBuffer) | |
pull(in) | |
} | |
override def onUpstreamFinish(): Unit = { | |
super.onUpstreamFinish() | |
channel.close() | |
} | |
override def onUpstreamFailure(ex: Throwable): Unit = { | |
super.onUpstreamFailure(ex) | |
channel.close() | |
} | |
override def preStart(): Unit = { // initiate the flow of data by issuing a first pull on materialization: | |
pull(in) | |
} | |
setHandler(in, this) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment