Last active
April 24, 2020 06:33
-
-
Save mykidong/674fb1913c81645ea04c35a6460ff37c to your computer and use it in GitHub Desktop.
JdbcHiveRelation.class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class JdbcHiveRelation extends BaseRelation implements Serializable, TableScan { | |
private SQLContext sqlContext; | |
private StructType schema; | |
private java.util.Map<String, String> parametersAsJava; | |
private java.util.Map<String, HiveMetaResolver.HiveMetadata> hiveMetadataMap; | |
public JdbcHiveRelation(SQLContext sqlContext, Map<String, String> parameters) | |
{ | |
this.sqlContext = sqlContext; | |
this.parametersAsJava = mapAsJavaMapConverter(parameters).asJava(); | |
} | |
@Override | |
public SQLContext sqlContext() { | |
return this.sqlContext; | |
} | |
@Override | |
public StructType schema() { | |
if(this.schema == null) | |
{ | |
this.buildSchema(); | |
} | |
return this.schema; | |
} | |
private void buildSchema() | |
{ | |
String dbTable = parametersAsJava.get(JdbcHiveOptions.dbTable); | |
String hiveJdbcUrl = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUrl); | |
String hiveJdbcUser = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUser); | |
String hiveJdbcPassword = parametersAsJava.get(JdbcHiveOptions.hiveJdbcPassword); | |
String hiveMetastoreUrl = parametersAsJava.get(JdbcHiveOptions.hiveMetastoreUrl); | |
String hiveMetastoreUser = parametersAsJava.get(JdbcHiveOptions.hiveMetastoreUser); | |
String hiveMetastorePassword = parametersAsJava.get(JdbcHiveOptions.hiveMetastorePassword); | |
HiveMetaResolver hiveMetaResolver = new HiveMetaResolver(dbTable, | |
hiveJdbcUrl, | |
hiveJdbcUser, | |
hiveJdbcPassword, | |
hiveMetastoreUrl, | |
hiveMetastoreUser, | |
hiveMetastorePassword); | |
this.schema = hiveMetaResolver.getSparkSchema(); | |
this.hiveMetadataMap = hiveMetaResolver.getHiveMetadataMap(); | |
} | |
@Override | |
public RDD<Row> buildScan() { | |
schema(); | |
String dbTable = parametersAsJava.get(JdbcHiveOptions.dbTable); | |
String conditionClause = parametersAsJava.get(JdbcHiveOptions.conditionClause); | |
String hiveJdbcUrl = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUrl); | |
String hiveJdbcUser = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUser); | |
String hiveJdbcPassword = parametersAsJava.get(JdbcHiveOptions.hiveJdbcPassword); | |
int fetchSize = Integer.parseInt(parametersAsJava.get(JdbcHiveOptions.fetchsize)); | |
String tempPath = parametersAsJava.get(JdbcHiveOptions.tempPath); | |
tempPath = (tempPath != null) ? tempPath : "/jdbc-hive-temp"; | |
String query = "select * from " + dbTable + " " + conditionClause; | |
ObjectMapper mapper = new ObjectMapper(); | |
Properties properties = new Properties(); | |
properties.setProperty("user", hiveJdbcUser); | |
properties.setProperty("password", hiveJdbcPassword); | |
String filePath = tempPath + "/" + UUID.randomUUID().toString() + "-" + UUID.randomUUID().toString(); | |
System.out.println("temp file path: [" + filePath + "]"); | |
Connection connection = null; | |
try { | |
connection = DriverManager.getConnection(hiveJdbcUrl, properties); | |
PreparedStatement statement = connection.prepareStatement(query); | |
statement.setFetchSize(fetchSize); | |
ResultSet rs = statement.executeQuery(); | |
long count = 0; | |
JavaRDD<Row> emptyRdd = new JavaSparkContext(sqlContext.sparkContext()).emptyRDD(); | |
Dataset<Row> df = sqlContext.createDataFrame(emptyRdd, schema); | |
String dfJson = df.schema().json(); | |
StructField[] structFieldList = schema.fields(); | |
List<Row> rowList = new ArrayList<>(); | |
while (rs.next()) { | |
List<Object> rowElements = new ArrayList<>(); | |
for(StructField structField : structFieldList) | |
{ | |
String columnName = structField.name(); | |
DataType dataType = structField.dataType(); | |
HiveMetaResolver.HiveMetadata hiveMetadata = hiveMetadataMap.get(columnName); | |
int sqlType = hiveMetadata.getDataType(); | |
int precision = hiveMetadata.getFieldSize(); | |
int scale = hiveMetadata.getFieldScale(); | |
boolean signed = hiveMetadata.isSigned(); | |
Dataset<Row> columnDf = df.select(functions.col(columnName)); | |
StructType columnSchema = columnDf.schema(); | |
String columnSchemaJson = columnSchema.json(); | |
java.util.Map<String, Object> columnSchemaMap = JsonUtils.toMap(mapper, columnSchemaJson); | |
Object tempResult = null; | |
// retrieve value from ResultSet considering spark schema. | |
... | |
} | |
Row finalRow = RowFactory.create(rowElements.toArray(new Object[0])); | |
rowList.add(finalRow); | |
if(count % fetchSize == 0) | |
{ | |
if(count > 0) { | |
System.out.println("fetch count: [" + count + "]"); | |
appendFetchedData(sqlContext.sparkSession(), rowList, schema, filePath); | |
// rowList refresh. | |
rowList = new ArrayList<>(); | |
} | |
} | |
count++; | |
} | |
if(rowList.size() > 0) | |
{ | |
System.out.println("row list remained: [" + rowList.size() + "]"); | |
appendFetchedData(sqlContext.sparkSession(), rowList, schema, filePath); | |
} | |
statement.close(); | |
if(connection != null) | |
{ | |
connection.close(); | |
} | |
} catch (Exception e) | |
{ | |
e.printStackTrace(); | |
throw new RuntimeException(e.getCause()); | |
} | |
Dataset<Row> df = sqlContext.read().format("parquet").load(filePath); | |
return df.rdd(); | |
} | |
private void appendFetchedData(SparkSession spark, List<Row> rowList, StructType schema, String filePath) | |
{ | |
if(rowList.size() > 0) { | |
spark.createDataFrame(rowList, schema).write() | |
.format("parquet") | |
.mode(SaveMode.Append) | |
.save(filePath); | |
System.out.println("row list saved to the temp file: [" + filePath + "]"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment