Skip to content

Instantly share code, notes, and snippets.

View asardaes's full-sized avatar

Alexis Sardá asardaes

View GitHub Profile
@asardaes
asardaes / SetSerializer.java
Last active August 14, 2023 13:05
Flink set serialization for POJOs
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
@asardaes
asardaes / CustomJwtBearerTokenResponseClient.kt
Last active July 19, 2023 08:15
Configuring Spring's WebClient for OAuth2 JWT Bearer Client Authentication (client_secret_jwt with private key) - MVC non-reactive version
// Salesforce doesn't include expires_in in its token response, so Spring assumes it's only valid for 1 second.
// Considering clock skew logic, each token is basically valid for a single call,
// so you might want to force the assumption that it can be valid for a little longer.
class CustomJwtBearerTokenResponseClient(
private val minimumTokenDuration: Duration
) : OAuth2AccessTokenResponseClient<JwtBearerGrantRequest> {
private val defaultClient = DefaultJwtBearerTokenResponseClient()
override fun getTokenResponse(authorizationGrantRequest: JwtBearerGrantRequest): OAuth2AccessTokenResponse {
@asardaes
asardaes / Test.kt
Created October 14, 2022 10:12
Flink test for broadcast state
@Test
fun `snapshot and subsequent restore`() {
// non-empty function (knows about active customers)
val function = getFunctionForTesting()
val (operatorId, stateFinalizer) = getHarness(function).use { harness ->
harness.processBroadcastElement(GenericChangeMessage(), 1L)
val stateFinalizer = harness.snapshotWithLocalState(0L, 1L)
assertEquals(1, stateFinalizer.jobManagerOwnedState.managedOperatorState.size)
Pair(harness.streamConfig.operatorID, stateFinalizer)
}
@asardaes
asardaes / StateCounts.kt
Last active April 11, 2022 15:10
Flink State Processor API Sample
// kotlin POJO
import java.io.Serializable
data class StateCounts(
var windowState: Int = 0,
var globalState: Int = 0,
var windows: Int = 0,
var dtos: Int = 0,
val dtoBytes: Long = 0L,
) : Serializable {
@asardaes
asardaes / README.txt
Last active April 15, 2023 12:33
Publishing a Gradle precompiled script plugin to a custom repository
Gradle plugins require a marker:
https://docs.gradle.org/current/userguide/plugins.html#sec:plugin_markers
Markers aren't published for precompiled scripts using groovy-gradle-plugin :(
https://github.com/gradle/gradle/issues/15190
The plugin_marker module creates the marker that Gradle needs to find the plugin and also publishes it,
see https://stackoverflow.com/a/60798021/5793905
Assuming all gradle modules have the same group with a folder structure like:
@asardaes
asardaes / Main.kt
Created June 3, 2019 12:14
Flink dynamic Key Selector
data class WC(var word: String, var count: Int) : Serializable {
constructor() : this("", 0)
}
class KS : KeySelector<WC, Int> {
private var count = 0
override fun getKey(value: WC): Int {
println("count=$count, value=$value")
return if (count++ < 2) 0 else 1
}
@asardaes
asardaes / ExecutionPlan.json
Created August 8, 2018 11:59
[Flink][Scala] JDBCInputFormat with SplitDataProperties
{
"nodes": [
{
"id": 4,
"type": "source",
"pact": "Data Source",
"contents": "at org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:390) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)",
"parallelism": "24",
"global_properties": [