Skip to content

Instantly share code, notes, and snippets.

@chiwanpark
Created February 9, 2016 11:07
Show Gist options
  • Save chiwanpark/0389ce946e4fff58d611 to your computer and use it in GitHub Desktop.
Save chiwanpark/0389ce946e4fff58d611 to your computer and use it in GitHub Desktop.
Flink Inherited POJO Example
package com.chiwanpark.flink;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Arrays;
/**
* Skeleton for a Flink Job.
*
* For a full example of a Flink Job, see the WordCountJob.java file in the
* same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster.
* Just type
* mvn clean package
* in the projects root directory.
* You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
*
*/
public class Job {
public static class MyParent {
private int sessionId;
public MyParent() {
}
public int getSessionId() {
return sessionId;
}
public void setSessionId(int sessionId) {
this.sessionId = sessionId;
}
}
public static class SourceA extends MyParent {
public SourceA() {
super();
}
public SourceA(int sessionId) {
super();
setSessionId(sessionId);
}
@Override
public String toString() {
return "SourceA{" + getSessionId() + "}";
}
}
public static class SourceB extends MyParent {
public SourceB() {
super();
}
public SourceB(int sessionId) {
super();
setSessionId(sessionId);
}
@Override
public String toString() {
return "SourceB{" + getSessionId() + "}";
}
}
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<SourceA> sourceA = env.fromCollection(Arrays.asList(new SourceA(1), new SourceA(2)));
DataSet<SourceB> sourceB = env.fromCollection(Arrays.asList(new SourceB(1), new SourceB(2)));
DataSet<Tuple2<SourceA, SourceB>> joined = sourceA.join(sourceB).where("sessionId").equalTo("sessionId");
joined.print();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment