Skip to content

Instantly share code, notes, and snippets.

@yinhua-dai
Created October 24, 2018 01:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yinhua-dai/143304464270afd19b6a926531f9acb1 to your computer and use it in GitHub Desktop.
Save yinhua-dai/143304464270afd19b6a926531f9acb1 to your computer and use it in GitHub Desktop.
Flink SQL test
package com.tr.apt.test;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.RowCsvInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
public class FlinkMain {
public static class MyTableSource implements BatchTableSource<Row>, Serializable {
private final Path path;
private final String[] fieldNames;
private final TypeInformation<?>[] fieldTypes;
private final RowTypeInfo rowTypeInfo;
private final TableSchema tableSchema;
MyTableSource(Path path, String[] fieldNames, TypeInformation<?>[] fieldTypes) {
this.path = path;
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
tableSchema = new TableSchema(fieldNames, fieldTypes);
}
@Override
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
RowCsvInputFormat inputFormat = new RowCsvInputFormat(path, fieldTypes);
return execEnv.createInput(inputFormat);
}
@Override
public TypeInformation<Row> getReturnType() {
return rowTypeInfo;
}
@Override
public TableSchema getTableSchema() {
return tableSchema;
}
@Override
public String explainSource() {
return "My table source";
}
}
private final static Logger LOG = LoggerFactory.getLogger(FlinkMain.class);
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
String[] fieldNames = {"f1", "f2"};
TypeInformation<?>[] fieldTypes = {Types.STRING, Types.STRING};
MyTableSource myTableSource = new MyTableSource(new Path("d:/tmp/input.csv"), fieldNames, fieldTypes);
tableEnv.registerTableSource("csvsource", myTableSource);
CsvTableSink csvTableSink = new CsvTableSink("d:/tmp/output.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);
Table result = tableEnv.sqlQuery("SELECT f2, f1 from csvsource");
result.writeToSink(csvTableSink);
env.setParallelism(1);
env.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment