-
-
Save da4nik/1c3098dbbaa62d9f7947146c1d1c23de to your computer and use it in GitHub Desktop.
Pipeline Pattern in Kotlin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sunrun.fi.ingestion.input | |
import java.io.ByteArrayOutputStream | |
import java.nio.file.Paths | |
import java.util.zip.ZipInputStream | |
interface Step<I, O> { | |
fun process(input : I) : O | |
} | |
class Pipeline<I, O>(private val current: Step<I, O>) { | |
fun <NewO> pipe(next: Step<O, NewO>): Pipeline<I, NewO> { | |
return Pipeline(object : Step<I, NewO> { | |
override fun process(input: I): NewO { | |
return next.process(current.process(input)) | |
} | |
}) | |
} | |
fun execute(input: I): O { | |
return current.process(input) | |
} | |
} | |
data class AdditionInput(val one : Int, val two : Int) | |
class AddIntegerStep : Step<AdditionInput, Int> { | |
override fun process(input: AdditionInput): Int { | |
return input.one + input.two | |
} | |
} | |
class IntToStringStep : Step<Int, String> { | |
override fun process(input: Int): String { | |
return input.toString() | |
} | |
} | |
data class AssetFile(val jobId: String, val ba : ByteArray) | |
class DecompressStep : Step<ZipInputStream, List<AssetFile>> { | |
override fun process(input: ZipInputStream): List<AssetFile> { | |
val buffer = ByteArray(1024) | |
var count: Int | |
val fileEntries = mutableListOf<AssetFile>() | |
input.use { | |
var entry = input.nextEntry | |
while (entry != null) { | |
val byteArrayOutputStream = ByteArrayOutputStream() | |
byteArrayOutputStream.use { | |
while (input.read(buffer).also { count = it } != -1) | |
byteArrayOutputStream.write(buffer, 0, count) | |
} | |
fileEntries.add(AssetFile(entry.name, byteArrayOutputStream.toByteArray())) | |
entry = input.nextEntry | |
} | |
} | |
return fileEntries | |
} | |
} | |
class UploadStep : Step<List<AssetFile>, List<AssetFile>> { | |
override fun process(input: List<AssetFile>): List<AssetFile> { | |
return input.map { | |
print("uploading ${it.jobId}, size=${it.ba.size}") | |
it | |
} | |
} | |
} | |
class SummaryStep : Step<List<AssetFile>, String> { | |
override fun process(input: List<AssetFile>): String { | |
return input | |
.map { it.jobId } | |
.reduce { | |
acc, s -> acc + s | |
} | |
} | |
} | |
fun main() { | |
// var pipeline = Pipeline(AddIntegerStep()).pipe(IntToStringStep()) | |
// print(pipeline.execute(AdditionInput(1,2))) | |
val pipeline = Pipeline(DecompressStep()).pipe(UploadStep()).pipe(SummaryStep()) | |
print(pipeline.execute(ZipInputStream(Paths.get("src/main/resources/source_data_small.zip").toFile().inputStream()))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment