Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Flink application to filter list of songs
package com.talentica.flink;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
public class FilterSongs {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromArgs(args);
final String inputPath = params.getRequired("input");
final String outputPath = params.getRequired("output");
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile(inputPath)
.types(Long.class, String.class, String.class);
DataSet<Song> songs = MapFunction<Tuple3<Long,String,String>, Song>() {
public Song map(Tuple3<Long, String, String> csvLine) throws Exception {
String songTitle = csvLine.f1;
String[] singers = csvLine.f2.split("\\|");
return new Song(songTitle, new HashSet<>(Arrays.asList(singers)));
DataSet<Song> filteredSongs = songs.filter(new FilterFunction<Song>() {
public boolean filter(Song song) throws Exception {
return song.getSingers().contains("Taylor Swift");
class Song {
private String title;
private Set<String> singers;
public Song(String title, Set<String> singers) {
this.title = title;
this.singers = singers;
public String getTitle() {
return title;
public Set<String> getSingers() {
return singers;
public String toString() {
return "Song{title='" + title + "', singers=" + singers + "}";
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment