Created
October 24, 2018 01:28
-
-
Save yinhua-dai/143304464270afd19b6a926531f9acb1 to your computer and use it in GitHub Desktop.
Flink SQL test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.tr.apt.test; | |
import org.apache.flink.api.common.io.FileInputFormat; | |
import org.apache.flink.api.common.typeinfo.TypeInformation; | |
import org.apache.flink.api.common.typeinfo.Types; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.api.java.io.RowCsvInputFormat; | |
import org.apache.flink.api.java.typeutils.RowTypeInfo; | |
import org.apache.flink.core.fs.FileSystem; | |
import org.apache.flink.core.fs.Path; | |
import org.apache.flink.table.api.Table; | |
import org.apache.flink.table.api.TableEnvironment; | |
import org.apache.flink.table.api.TableSchema; | |
import org.apache.flink.table.sinks.CsvTableSink; | |
import org.apache.flink.table.sources.BatchTableSource; | |
import org.apache.flink.table.sources.CsvTableSource; | |
import org.apache.flink.types.Row; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.Serializable; | |
public class FlinkMain { | |
public static class MyTableSource implements BatchTableSource<Row>, Serializable { | |
private final Path path; | |
private final String[] fieldNames; | |
private final TypeInformation<?>[] fieldTypes; | |
private final RowTypeInfo rowTypeInfo; | |
private final TableSchema tableSchema; | |
MyTableSource(Path path, String[] fieldNames, TypeInformation<?>[] fieldTypes) { | |
this.path = path; | |
this.fieldNames = fieldNames; | |
this.fieldTypes = fieldTypes; | |
rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); | |
tableSchema = new TableSchema(fieldNames, fieldTypes); | |
} | |
@Override | |
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { | |
RowCsvInputFormat inputFormat = new RowCsvInputFormat(path, fieldTypes); | |
return execEnv.createInput(inputFormat); | |
} | |
@Override | |
public TypeInformation<Row> getReturnType() { | |
return rowTypeInfo; | |
} | |
@Override | |
public TableSchema getTableSchema() { | |
return tableSchema; | |
} | |
@Override | |
public String explainSource() { | |
return "My table source"; | |
} | |
} | |
private final static Logger LOG = LoggerFactory.getLogger(FlinkMain.class); | |
public static void main(String[] args) throws Exception { | |
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
TableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); | |
String[] fieldNames = {"f1", "f2"}; | |
TypeInformation<?>[] fieldTypes = {Types.STRING, Types.STRING}; | |
MyTableSource myTableSource = new MyTableSource(new Path("d:/tmp/input.csv"), fieldNames, fieldTypes); | |
tableEnv.registerTableSource("csvsource", myTableSource); | |
CsvTableSink csvTableSink = new CsvTableSink("d:/tmp/output.csv", ",", 1, FileSystem.WriteMode.OVERWRITE); | |
Table result = tableEnv.sqlQuery("SELECT f2, f1 from csvsource"); | |
result.writeToSink(csvTableSink); | |
env.setParallelism(1); | |
env.execute(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment