Skip to content

Instantly share code, notes, and snippets.

@mmonti
Created February 28, 2020 17:45
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mmonti/4c27273d202f12bcb2b61cd89b759c38 to your computer and use it in GitHub Desktop.
Save mmonti/4c27273d202f12bcb2b61cd89b759c38 to your computer and use it in GitHub Desktop.
Pipeline Pattern in Kotlin
package com.sunrun.fi.ingestion.input
import java.io.ByteArrayOutputStream
import java.nio.file.Paths
import java.util.zip.ZipInputStream
interface Step<I, O> {
fun process(input : I) : O
}
class Pipeline<I, O>(private val current: Step<I, O>) {
fun <NewO> pipe(next: Step<O, NewO>): Pipeline<I, NewO> {
return Pipeline(object : Step<I, NewO> {
override fun process(input: I): NewO {
return next.process(current.process(input))
}
})
}
fun execute(input: I): O {
return current.process(input)
}
}
data class AdditionInput(val one : Int, val two : Int)
class AddIntegerStep : Step<AdditionInput, Int> {
override fun process(input: AdditionInput): Int {
return input.one + input.two
}
}
class IntToStringStep : Step<Int, String> {
override fun process(input: Int): String {
return input.toString()
}
}
data class AssetFile(val jobId: String, val ba : ByteArray)
class DecompressStep : Step<ZipInputStream, List<AssetFile>> {
override fun process(input: ZipInputStream): List<AssetFile> {
val buffer = ByteArray(1024)
var count: Int
val fileEntries = mutableListOf<AssetFile>()
input.use {
var entry = input.nextEntry
while (entry != null) {
val byteArrayOutputStream = ByteArrayOutputStream()
byteArrayOutputStream.use {
while (input.read(buffer).also { count = it } != -1)
byteArrayOutputStream.write(buffer, 0, count)
}
fileEntries.add(AssetFile(entry.name, byteArrayOutputStream.toByteArray()))
entry = input.nextEntry
}
}
return fileEntries
}
}
class UploadStep : Step<List<AssetFile>, List<AssetFile>> {
override fun process(input: List<AssetFile>): List<AssetFile> {
return input.map {
print("uploading ${it.jobId}, size=${it.ba.size}")
it
}
}
}
class SummaryStep : Step<List<AssetFile>, String> {
override fun process(input: List<AssetFile>): String {
return input
.map { it.jobId }
.reduce {
acc, s -> acc + s
}
}
}
fun main() {
// var pipeline = Pipeline(AddIntegerStep()).pipe(IntToStringStep())
// print(pipeline.execute(AdditionInput(1,2)))
val pipeline = Pipeline(DecompressStep()).pipe(UploadStep()).pipe(SummaryStep())
print(pipeline.execute(ZipInputStream(Paths.get("src/main/resources/source_data_small.zip").toFile().inputStream())))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment