Created
February 28, 2020 17:45
-
-
Save mmonti/4c27273d202f12bcb2b61cd89b759c38 to your computer and use it in GitHub Desktop.
Pipeline Pattern in Kotlin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sunrun.fi.ingestion.input | |
import java.io.ByteArrayOutputStream | |
import java.nio.file.Paths | |
import java.util.zip.ZipInputStream | |
interface Step<I, O> { | |
fun process(input : I) : O | |
} | |
class Pipeline<I, O>(private val current: Step<I, O>) { | |
fun <NewO> pipe(next: Step<O, NewO>): Pipeline<I, NewO> { | |
return Pipeline(object : Step<I, NewO> { | |
override fun process(input: I): NewO { | |
return next.process(current.process(input)) | |
} | |
}) | |
} | |
fun execute(input: I): O { | |
return current.process(input) | |
} | |
} | |
data class AdditionInput(val one : Int, val two : Int) | |
class AddIntegerStep : Step<AdditionInput, Int> { | |
override fun process(input: AdditionInput): Int { | |
return input.one + input.two | |
} | |
} | |
class IntToStringStep : Step<Int, String> { | |
override fun process(input: Int): String { | |
return input.toString() | |
} | |
} | |
data class AssetFile(val jobId: String, val ba : ByteArray) | |
class DecompressStep : Step<ZipInputStream, List<AssetFile>> { | |
override fun process(input: ZipInputStream): List<AssetFile> { | |
val buffer = ByteArray(1024) | |
var count: Int | |
val fileEntries = mutableListOf<AssetFile>() | |
input.use { | |
var entry = input.nextEntry | |
while (entry != null) { | |
val byteArrayOutputStream = ByteArrayOutputStream() | |
byteArrayOutputStream.use { | |
while (input.read(buffer).also { count = it } != -1) | |
byteArrayOutputStream.write(buffer, 0, count) | |
} | |
fileEntries.add(AssetFile(entry.name, byteArrayOutputStream.toByteArray())) | |
entry = input.nextEntry | |
} | |
} | |
return fileEntries | |
} | |
} | |
class UploadStep : Step<List<AssetFile>, List<AssetFile>> { | |
override fun process(input: List<AssetFile>): List<AssetFile> { | |
return input.map { | |
print("uploading ${it.jobId}, size=${it.ba.size}") | |
it | |
} | |
} | |
} | |
class SummaryStep : Step<List<AssetFile>, String> { | |
override fun process(input: List<AssetFile>): String { | |
return input | |
.map { it.jobId } | |
.reduce { | |
acc, s -> acc + s | |
} | |
} | |
} | |
fun main() { | |
// var pipeline = Pipeline(AddIntegerStep()).pipe(IntToStringStep()) | |
// print(pipeline.execute(AdditionInput(1,2))) | |
val pipeline = Pipeline(DecompressStep()).pipe(UploadStep()).pipe(SummaryStep()) | |
print(pipeline.execute(ZipInputStream(Paths.get("src/main/resources/source_data_small.zip").toFile().inputStream()))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment