Skip to content

Instantly share code, notes, and snippets.

@mushketyk
Last active December 14, 2016 09:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mushketyk/acffb701a1f71a6e9bd661c781d7b18c to your computer and use it in GitHub Desktop.
Save mushketyk/acffb701a1f71a6e9bd661c781d7b18c to your computer and use it in GitHub Desktop.
public static class TestBatchTableSource implements BatchTableSource<MyPojo> {
private TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
BasicTypeInfo.INT_TYPE_INFO,
new PojoTypeInfo<>(ChildPojo.class,
Arrays.asList(
new PojoField(getField(ChildPojo.class, "child"), new PojoTypeInfo<>(MostChild.class, Arrays.asList(
new PojoField(getField(MostChild.class, "str"), BasicTypeInfo.STRING_TYPE_INFO)
))),
new PojoField(getField(ChildPojo.class, "str"), BasicTypeInfo.STRING_TYPE_INFO)
)
),
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
@Override
public DataSet<MyPojo> getDataSet(ExecutionEnvironment execEnv) {
return execEnv.createInput(new MyPojoGenericInputFormat(), getReturnType()).setParallelism(1);
}
@Override
public int getNumberOfFields() {
return 4;
}
@Override
public String[] getFieldsNames() {
return new String[]{"amount", "childPojo", "id", "name"};
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return fieldTypes;
}
@Override
public TypeInformation<MyPojo> getReturnType() {
return new PojoTypeInfo<>(MyPojo.class,
Arrays.asList(
new PojoField(getField(MyPojo.class, "childPojo"), new PojoTypeInfo<>(ChildPojo.class,
Arrays.asList(
new PojoField(getField(ChildPojo.class, "child"), new PojoTypeInfo<>(MostChild.class, Arrays.asList(
new PojoField(getField(MostChild.class, "str"), BasicTypeInfo.STRING_TYPE_INFO)
))),
new PojoField(getField(ChildPojo.class, "str"), BasicTypeInfo.STRING_TYPE_INFO)
)
)),
new PojoField(getField(MyPojo.class, "name"), BasicTypeInfo.STRING_TYPE_INFO),
new PojoField(getField(MyPojo.class, "id"), BasicTypeInfo.LONG_TYPE_INFO),
new PojoField(getField(MyPojo.class, "amount"), BasicTypeInfo.INT_TYPE_INFO)
));
}
private static Field getField(Class cls, String name) {
try {
return cls.getField(name);
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private static class MyPojoGenericInputFormat extends GenericInputFormat<MyPojo> {
int cnt = 0;
@Override
public boolean reachedEnd() throws IOException {
return cnt == 33;
}
@Override
public MyPojo nextRecord(MyPojo reuse) throws IOException {
cnt++;
MostChild mostChild = new MostChild();
mostChild.str = "mostChildPojo" + cnt;
ChildPojo childPojo = new ChildPojo();
childPojo.str = "childPojo" + cnt;
childPojo.child = mostChild;
reuse.setChildPojo(childPojo);
reuse.setName("pojo" + cnt);
reuse.setId(cnt);
reuse.setAmount(cnt % 16);
return reuse;
}
}
}
public static class MyPojo {
public ChildPojo childPojo;
public String name;
public long id;
public int amount;
public ChildPojo getChildPojo() {
return childPojo;
}
public void setChildPojo(ChildPojo childPojo) {
this.childPojo = childPojo;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
@Override
public String toString() {
return "MyPojo{" +
"name='" + name + '\'' +
", id=" + id +
", amount=" + amount +
'}';
}
}
public static class ChildPojo {
public MostChild child;
public String str;
public String getStr() {
return str;
}
@Override
public String toString() {
return "ChildPojo{" +
"str='" + str + '\'' +
'}';
}
}
public static class MostChild {
public String str;
@Override
public String toString() {
return "MostChild{" +
"str='" + str + '\'' +
'}';
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment