Skip to content

Instantly share code, notes, and snippets.

@docete
Created September 17, 2019 09:31
Show Gist options
  • Save docete/c599c4192d9fb5d53494c5ab08fad40a to your computer and use it in GitHub Desktop.
Save docete/c599c4192d9fb5d53494c5ab08fad40a to your computer and use it in GitHub Desktop.
WindowPojoJoin
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.join;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.examples.utils.ThrottledIterator;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Random;
/**
* Example illustrating a windowed stream join between two data streams.
*
* <p>The example works on two input streams with pairs (name, grade) and (name, salary)
* respectively. It joins the steams based on "name" within a configurable window.
*
* <p>The example uses a built-in sample data generator that generates
* the steams of pairs at a configurable rate.
*/
@SuppressWarnings("serial")
public class WindowPojoJoin {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
// parse the parameters
final ParameterTool params = ParameterTool.fromArgs(args);
final long windowSize = params.getLong("windowSize", 2000);
final long rate = params.getLong("rate", 3L);
System.out.println("Using windowSize=" + windowSize + ", data rate=" + rate);
System.out.println("To customize example, use: WindowJoin [--windowSize <window-size-in-millis>] [--rate <elements-per-second>]");
// obtain execution environment, run this example in "ingestion time"
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// create the data sources for both grades and salaries
DataStream<Grade> grades = GradeSourceV2.getSource(env, rate);
DataStream<Salary> salaries = SalarySourceV2.getSource(env, rate);
// run the actual window join program
// for testability, this functionality is in a separate method.
DataStream<Tuple3<String, Integer, Integer>> joinedStream = runWindowJoin(grades, salaries, windowSize);
// print the results with a single thread, rather than in parallel
joinedStream.print().setParallelism(1);
// execute program
env.execute("Windowed POJO Join Example");
}
public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
DataStream<Grade> grades,
DataStream<Salary> salaries,
long windowSize) {
return grades.join(salaries)
.where(new GradeKeySelector())
.equalTo(new SalaryKeySelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(new JoinFunction<Grade, Salary, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> join(
Grade first,
Salary second) {
return new Tuple3<String, Integer, Integer>(first.name, first.grade, second.salary);
}
});
}
private static class GradeKeySelector implements KeySelector<Grade, String> {
@Override
public String getKey(Grade value) {
return value.name;
}
}
private static class SalaryKeySelector implements KeySelector<Salary, String> {
@Override
public String getKey(Salary value) {
return value.name;
}
}
public static class Grade {
public String name;
public int grade;
public Grade(String name, int grade) {
this.name = name;
this.grade = grade;
}
}
public static class Salary {
public String name;
public int salary;
public Salary(String name, int salary) {
this.name = name;
this.salary = salary;
}
}
static final String[] NAMES = {"tom", "jerry", "alice", "bob", "john", "grace"};
static final int GRADE_COUNT = 5;
static final int SALARY_MAX = 10000;
public static class GradeSourceV2 implements Iterator<Grade>, Serializable {
private final Random rnd = new Random(hashCode());
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasNext() {
return true;
}
@Override
public Grade next() {
return new WindowJoin.Grade(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(GRADE_COUNT) + 1);
}
public static DataStream<Grade> getSource(StreamExecutionEnvironment env, long rate) {
return env.fromCollection(new ThrottledIterator<>(new GradeSourceV2(), rate),
TypeInformation.of(new TypeHint<Grade>() {}));
}
}
public static class SalarySourceV2 implements Iterator<Salary>, Serializable {
private final Random rnd = new Random(hashCode());
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasNext() {
return true;
}
@Override
public Salary next() {
return new Salary(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(SALARY_MAX) + 1);
}
public static DataStream<Salary> getSource(StreamExecutionEnvironment env, long rate) {
return env.fromCollection(new ThrottledIterator<>(new SalarySourceV2(), rate),
TypeInformation.of(new TypeHint<Salary>() {}));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment