Skip to content

Instantly share code, notes, and snippets.

@sagargangurde
Last active January 5, 2021 12:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sagargangurde/d2ebbe257837c6909d0a824ec4d6df4e to your computer and use it in GitHub Desktop.
Save sagargangurde/d2ebbe257837c6909d0a824ec4d6df4e to your computer and use it in GitHub Desktop.
Flink application to filter list of songs
package com.talentica.flink;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
public class FilterSongs {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromArgs(args);
final String inputPath = params.getRequired("input");
final String outputPath = params.getRequired("output");
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile(inputPath)
.ignoreFirstLine()
.parseQuotedStrings('"')
.ignoreInvalidLines()
.types(Long.class, String.class, String.class);
DataSet<Song> songs = lines.map(new MapFunction<Tuple3<Long,String,String>, Song>() {
@Override
public Song map(Tuple3<Long, String, String> csvLine) throws Exception {
String songTitle = csvLine.f1;
String[] singers = csvLine.f2.split("\\|");
return new Song(songTitle, new HashSet<>(Arrays.asList(singers)));
}
});
DataSet<Song> filteredSongs = songs.filter(new FilterFunction<Song>() {
@Override
public boolean filter(Song song) throws Exception {
return song.getSingers().contains("Taylor Swift");
}
});
filteredSongs.writeAsText(outputPath);
env.execute();
}
}
class Song {
private String title;
private Set<String> singers;
public Song(String title, Set<String> singers) {
this.title = title;
this.singers = singers;
}
public String getTitle() {
return title;
}
public Set<String> getSingers() {
return singers;
}
@Override
public String toString() {
return "Song{title='" + title + "', singers=" + singers + "}";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment