Last active
July 11, 2019 16:01
-
-
Save mgagliardo91/f0ce50276da6e5a8d7d83fba17d1ecef to your computer and use it in GitHub Desktop.
GitHub Injector Show & Tell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Square Map Function | |
stream.map(new RichMapFunction<Integer, Integer>() { | |
@Override | |
public Integer map(Integer input) { | |
return input * input; | |
} | |
}); | |
// Flat Map transform a string to an stream of individual characters | |
stream.flatMap(new RichFlatMapFunction<String, String>() { | |
@Override | |
public void flatMap(String input, Collector<String> collector) { | |
return collector.collect(input.split("")); | |
} | |
}); | |
// All custom functions must be serializable | |
stream.map(new RichMapFunction<SomeEntity, String>() { | |
private transient ObjectMapper objectMapper; | |
@Override | |
public JsonNode map(SomeEntity input) { | |
if (objectMapper == null) { | |
objectMapper = new ObjectMapper(); // initialize for the life of the function | |
} | |
return objectMapper.writeValueAsString(input); | |
} | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class FetchRequest implements Serializable { | |
private Integer installationId; | |
private String accountId; | |
private GitHubEntity parent; | |
private GitHubEntityType entityType; | |
private List<String> entityIds; | |
private List<String> entityNodeIds; | |
// Getters/Setters | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class GitHubEventProcessor implements Serializable { | |
private String[] eventNames; | |
private DatabaseConfig databaseConfig; | |
public GitHubEventProcessor(String[] eventNames, DatabaseConfig databaseConfig) { | |
this.eventNames = eventNames; | |
this.databaseConfig = databaseConfig; | |
} | |
public String[] getEventNames() { | |
return eventNames; | |
} | |
public DatabaseConfig getDatabaseConfig() { | |
return databaseConfig; | |
} | |
public String getProcessorName() { | |
return getClass().getSimpleName().replaceAll("(?<!^)([A-Z])", "-$1").toLowerCase(); | |
} | |
public abstract String getUid(); | |
public abstract void apply(DataStream<Tuple2<Timestamp, JsonNode>> stream); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RepositoryProcessor extends GitHubEventProcessor { | |
private static final String[] eventNames = new String[]{ | |
"installation_repositories", | |
"installation", | |
"repository" | |
}; | |
public RepositoryProcessor(DatabaseConfig databaseConfig, SinkFunction<JsonNode> fetchRequestSink) { | |
super(eventNames, databaseConfig); | |
this.fetchRequestSink = fetchRequestSink; | |
} | |
@Override | |
public String getUid() { | |
return topologyUid; | |
} | |
@Override | |
public void apply(DataStream<Tuple2<Timestamp, JsonNode>> stream) { | |
SplitStream<Tuple2<Timestamp, JsonNode>> streamByEventType = stream | |
.keyBy(/* installationId */) | |
.split(/* eventType */); | |
processInstallationEvents(streamByEventType.select("installation_repositories", "installation")); | |
processRepositoryEvents(streamByEventType.select("repository")); | |
} | |
void processRepositoryEvents(DataStream<Tuple2<Timestamp, JsonNode>> repositoryEvents) { | |
repositoryEvents | |
.window(/* Session Window 15 seconds */) | |
.process(new ProcessWindowFunction(/* Create fetch requests for repository update events */) | |
.addSink(fetchRequestSink) | |
} | |
void processInstallationEvents(DataStream<Tuple2<Timestamp, JsonNode>> installationEvents) { | |
SplitStream installationRepositories = installationEvents | |
.flatMap(/* extract into either group of added or removed repositories */) | |
.split(/* based on added or removed */); | |
installationEvents.select(/* added */) | |
.map(/* create Fetch Request */) | |
.addSink(fetchRequestSink); | |
// Remove the entries from the SoR database | |
processRemoved( | |
installationEvents.select(/* removed */) | |
.map*(/* create database entry with "REMOVED" action */) | |
); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment