Created
December 2, 2020 14:55
-
-
Save palutz/851080e51f99ec293411cf648a9cd120 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// simple flink job | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.api.common.functions.FilterFunction; | |
public class Example { | |
public static StreamExecutionEnvironment getEnvironment(env: String) { | |
StreamExecutionEnvironment env; | |
if (env == "PROD") { | |
env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
} else { | |
env = StreamExecutionEnvironment.createLocalEnvironment() | |
} | |
return env; | |
} | |
public static Person rowToPerson(row: String) { | |
new Person(...) | |
} | |
public static String toJson(p: PersonDynamo) { | |
// ... convert in Json and return as a String | |
} | |
public static DynamoSink createSinkDynamo<String>() { | |
... | |
} | |
public static void main(String[] args) throws Exception { | |
final ConfigEnv config = Config.getConfig(); | |
final StreamExecutionEnvironment env = getEnvironment(config.getEnv()); | |
final myDynamo = | |
DataStream<String> lines = env.readTextFile(config.getInputFile); | |
DataStream<Person> personDataStream = lines.map(new MapFunction<String, Person>() { | |
@Override | |
public String map(String row) { | |
return rowToPerson(row); | |
} | |
}); | |
DataStream<String> personDynamoStream = jsonDataStream.map(new MapFunction<Person, PersonDynamo>() { | |
@Override | |
public String map(Person p) { | |
return new PersonDynamo(p); | |
} | |
}); | |
DataStream<String> jsonDataStream = personDynamoStream.map(new MapFunction<PersonDynamo, String>() { | |
@Override | |
public String map(PersonDynamo pd) { | |
return toJson(pd); | |
} | |
}); | |
personDynamoStream.writeAsText(); | |
personDataStream.addSink(createSinkDynamo()); | |
env.execute("simple job"); | |
} | |
public static class Person { | |
public String messageKey; | |
public DateTime outboxPublishedDate; | |
public DateTime createdDate; | |
public DateTime updatedDate; | |
public String eventType; | |
public String eventTypeKeyName; | |
public String eventMetadataVersion; | |
public String transactionId; | |
public String correlationId; | |
public String subscriptionKey; | |
public Boolean isCredit | |
public Double amount; | |
public String currency; | |
public Double postedBalance; | |
public DateTime postedTimestampUtc; | |
public String state; | |
public String transactionCode; | |
public DateTime lastUpdated; | |
public Person(String msgKey, String eventType, String eventVers, String trxId, /* ... */) { | |
this.messageKey = msgKey; | |
this.eventType = eventType; | |
this.eventMetadataVersion = eventVers; | |
this.transactionId = trxId; | |
// ... | |
}; | |
} | |
public static class PersonDynamo extends Person { | |
public String compositeId; | |
public PersonDynamo(p: Person) { | |
this.compositeId = p.correlationId + p.messageKey; | |
this.messageKey = p.messageKey; | |
this outboxPublishedDate = p.outoutboxPublishedDate; | |
// ... | |
} | |
} | |
// or composing the extended class for dynamo | |
public static class PersonDynamo { | |
public String compositeId; | |
public Person person; | |
public PersonDynamo(p: Person) { | |
this.compositeId = p.correlationId + p.messageKey; | |
this.person = p; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment