Skip to content

Instantly share code, notes, and snippets.

@clayrat
Last active August 29, 2015 14:11
Show Gist options
  • Save clayrat/7b6958ab9b6d1bd23a77 to your computer and use it in GitHub Desktop.
Save clayrat/7b6958ab9b6d1bd23a77 to your computer and use it in GitHub Desktop.
import scalaz.stream.nio.{file => nio}
import scodec.bits.ByteVector
...
val uuidSource: Process[Task, String] =
Process.repeatEval(
Task.delay(
// uuid creation
)
)
def kafkaSink(producerConfig: ProducerConfig, topic: String): Sink[Task, String] = {
val producer = new Producer[String, String](producerConfig)
io.channel { msg =>
Task.delay {
val data = new KeyedMessage[String, String](topic, msg, msg)
producer.send(data)
}
}
}
val path = "D:/"
val eol = ByteVector(Array(13,10).map(_.toByte))
val fileSink: Sink[Task, Vector[String]] = io.channel { lines =>
Task.delay {
val fname = lines.head concat ".txt"
val bytes = lines.map(line => ByteVector(line.getBytes))
Process.emitAll(bytes).toSource.intersperse(eol).to(nio.chunkW(path concat fname)).run.run
}
}
def main(args: Array[String]) {
// config
implicit val scheduler = DefaultScheduler
val proc =
Process.awakeEvery(500 milliseconds)
.zipWith(uuidSource)((_, uuid) => uuid)
.observe(kafkaSink(config, topic))
.chunk(1000)
.to(fileSink)
proc.run.run
}
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment