Skip to content

Instantly share code, notes, and snippets.

@mgagliardo91
Last active July 11, 2019 16:01
Show Gist options
  • Save mgagliardo91/f0ce50276da6e5a8d7d83fba17d1ecef to your computer and use it in GitHub Desktop.
Save mgagliardo91/f0ce50276da6e5a8d7d83fba17d1ecef to your computer and use it in GitHub Desktop.
GitHub Injector Show & Tell
// Square Map Function
stream.map(new RichMapFunction<Integer, Integer>() {
@Override
public Integer map(Integer input) {
return input * input;
}
});
// Flat Map transform a string to an stream of individual characters
stream.flatMap(new RichFlatMapFunction<String, String>() {
@Override
public void flatMap(String input, Collector<String> collector) {
return collector.collect(input.split(""));
}
});
// All custom functions must be serializable
stream.map(new RichMapFunction<SomeEntity, String>() {
private transient ObjectMapper objectMapper;
@Override
public JsonNode map(SomeEntity input) {
if (objectMapper == null) {
objectMapper = new ObjectMapper(); // initialize for the life of the function
}
return objectMapper.writeValueAsString(input);
}
});
public class FetchRequest implements Serializable {
private Integer installationId;
private String accountId;
private GitHubEntity parent;
private GitHubEntityType entityType;
private List<String> entityIds;
private List<String> entityNodeIds;
// Getters/Setters
}
public abstract class GitHubEventProcessor implements Serializable {
private String[] eventNames;
private DatabaseConfig databaseConfig;
public GitHubEventProcessor(String[] eventNames, DatabaseConfig databaseConfig) {
this.eventNames = eventNames;
this.databaseConfig = databaseConfig;
}
public String[] getEventNames() {
return eventNames;
}
public DatabaseConfig getDatabaseConfig() {
return databaseConfig;
}
public String getProcessorName() {
return getClass().getSimpleName().replaceAll("(?<!^)([A-Z])", "-$1").toLowerCase();
}
public abstract String getUid();
public abstract void apply(DataStream<Tuple2<Timestamp, JsonNode>> stream);
}
public class RepositoryProcessor extends GitHubEventProcessor {
private static final String[] eventNames = new String[]{
"installation_repositories",
"installation",
"repository"
};
public RepositoryProcessor(DatabaseConfig databaseConfig, SinkFunction<JsonNode> fetchRequestSink) {
super(eventNames, databaseConfig);
this.fetchRequestSink = fetchRequestSink;
}
@Override
public String getUid() {
return topologyUid;
}
@Override
public void apply(DataStream<Tuple2<Timestamp, JsonNode>> stream) {
SplitStream<Tuple2<Timestamp, JsonNode>> streamByEventType = stream
.keyBy(/* installationId */)
.split(/* eventType */);
processInstallationEvents(streamByEventType.select("installation_repositories", "installation"));
processRepositoryEvents(streamByEventType.select("repository"));
}
void processRepositoryEvents(DataStream<Tuple2<Timestamp, JsonNode>> repositoryEvents) {
repositoryEvents
.window(/* Session Window 15 seconds */)
.process(new ProcessWindowFunction(/* Create fetch requests for repository update events */)
.addSink(fetchRequestSink)
}
void processInstallationEvents(DataStream<Tuple2<Timestamp, JsonNode>> installationEvents) {
SplitStream installationRepositories = installationEvents
.flatMap(/* extract into either group of added or removed repositories */)
.split(/* based on added or removed */);
installationEvents.select(/* added */)
.map(/* create Fetch Request */)
.addSink(fetchRequestSink);
// Remove the entries from the SoR database
processRemoved(
installationEvents.select(/* removed */)
.map*(/* create database entry with "REMOVED" action */)
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment