Skip to content

Instantly share code, notes, and snippets.

@regis-leray
Created November 17, 2017 18:46
Show Gist options
  • Save regis-leray/4fffb8ddccb0297600e75d39d9bb2fce to your computer and use it in GitHub Desktop.
Save regis-leray/4fffb8ddccb0297600e75d39d9bb2fce to your computer and use it in GitHub Desktop.
WriteChannelSink
private class WriteChannelSource(channel: WriteChannel) extends GraphStage[SinkShape[ByteString]] {
val in: Inlet[ByteString] = Inlet.create[ByteString]("WriteChannelSource.in")
val shape: SinkShape[ByteString] = SinkShape.of(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new WriteChannelStageLogic()
private class WriteChannelStageLogic extends GraphStageLogic(shape) with InHandler {
override def onPush(): Unit = {
val data = grab(in)
channel.write(data.toByteBuffer)
pull(in)
}
override def onUpstreamFinish(): Unit = {
super.onUpstreamFinish()
channel.close()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
super.onUpstreamFailure(ex)
channel.close()
}
override def preStart(): Unit = { // initiate the flow of data by issuing a first pull on materialization:
pull(in)
}
setHandler(in, this)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment