Skip to content

Instantly share code, notes, and snippets.

@chiwanpark
Last active September 5, 2015 13:12
Show Gist options
  • Save chiwanpark/a0b0269c9a9b058d15d3 to your computer and use it in GitHub Desktop.
Save chiwanpark/a0b0269c9a9b058d15d3 to your computer and use it in GitHub Desktop.
Example for hagersaleh
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.examples.java.relational.TPCHQuery3.Customer;
import org.apache.flink.examples.java.relational.TPCHQuery3.Order;
class Example {
// TODO: NEED TO SET CORRECT PATH!
private static String customerPath = "";
private static String ordersPath = "";
private static String outputPath = "";
public static void main(String... args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// load data
DataSet<Customer> customers = getCustomerDataSet(env);
DataSet<Order> orders = getOrdersDataSet(env);
// filter customer which bought auto mobile
DataSet<Customer> filteredCustomers = customers.filter(new FilterFunction<Customer>() {
@Override
public boolean filter(Customer value) {
return "AUTOMOBILE".equals(value.f1);
}
});
// filter order of which date is "O" and key is 7. (I think this condition is not valid but it is just from your code.)
DataSet<Order> filteredOrders = orders.filter(new FilterFunction<Order>() {
@Override
public boolean filter(Order value) {
return "O".equals(value.f2) && value.f0 == 7;
}
});
// join filtered customers and orders
DataSet<Tuple5<Long, String, Long, String, Long>> results = filteredCustomers.join(filteredOrders).where(0).equalTo(1)
.with(new JoinFunction<Customer, Order, Tuple5<Long, String, Long, String, Long>>() {
@Override
public Tuple5<Long, String, Long, String, Long> join(Customer c, Order o) {
return new Tuple5<>(c.f0, c.f1, o.f0, o.f2, o.f3);
}
});
// write csv file
results.writeAsCsv(outputPath);
env.execute();
}
private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) {
return env.readCsvFile(customerPath)
.fieldDelimiter("|")
.includeFields("10000010")
.tupleType(Customer.class);
}
private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
return env.readCsvFile(ordersPath).fieldDelimiter("|")
.includeFields("110010010")
.tupleType(Order.class);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment