Skip to content

Instantly share code, notes, and snippets.

@mykidong
Last active April 24, 2020 06:39
Show Gist options
  • Save mykidong/428255e83fd36280c60b40e7dc0556fb to your computer and use it in GitHub Desktop.
Save mykidong/428255e83fd36280c60b40e7dc0556fb to your computer and use it in GitHub Desktop.
HiveRelation.class
public class HiveRelation extends BaseRelation implements Serializable, TableScan {
private SQLContext sqlContext;
private StructType schema;
private java.util.Map<String, String> parametersAsJava;
private Dataset<Row> df;
public HiveRelation(SQLContext sqlContext, Map<String, String> parameters)
{
this.sqlContext = sqlContext;
this.parametersAsJava = mapAsJavaMapConverter(parameters).asJava();
}
@Override
public SQLContext sqlContext() {
return this.sqlContext;
}
@Override
public StructType schema() {
if(schema == null)
{
execHiveQuery();
}
return this.schema;
}
@Override
public RDD<Row> buildScan() {
if(df == null)
{
execHiveQuery();
}
return df.rdd();
}
private void buildSchema()
{
String dbTable = parametersAsJava.get(JdbcHiveOptions.dbTable);
String hiveJdbcUrl = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUrl);
String hiveJdbcUser = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUser);
String hiveJdbcPassword = parametersAsJava.get(JdbcHiveOptions.hiveJdbcPassword);
String hiveMetastoreUrl = parametersAsJava.get(JdbcHiveOptions.hiveMetastoreUrl);
String hiveMetastoreUser = parametersAsJava.get(JdbcHiveOptions.hiveMetastoreUser);
String hiveMetastorePassword = parametersAsJava.get(JdbcHiveOptions.hiveMetastorePassword);
HiveMetaResolver hiveMetaResolver = new HiveMetaResolver(dbTable,
hiveJdbcUrl,
hiveJdbcUser,
hiveJdbcPassword,
hiveMetastoreUrl,
hiveMetastoreUser,
hiveMetastorePassword);
this.schema = hiveMetaResolver.getSparkSchema();
}
private void execHiveQuery()
{
buildSchema();
String dbTable = parametersAsJava.get(JdbcHiveOptions.dbTable);
String hiveJdbcUrl = parametersAsJava.get(HiveOptions.hiveJdbcUrl);
String hiveJdbcUser = parametersAsJava.get(HiveOptions.hiveJdbcUser);
String hiveJdbcPassword = parametersAsJava.get(HiveOptions.hiveJdbcPassword);
String conditionClause = parametersAsJava.get(JdbcHiveOptions.conditionClause);
String defaultFs = parametersAsJava.get(HiveOptions.defaultFs);
String hadoopConfProperties = parametersAsJava.get(HiveOptions.hadoopConfProperties);
String outputPath = parametersAsJava.get(HiveOptions.outputPath);
outputPath = outputPath + "/" + UUID.randomUUID().toString() + "-" + UUID.randomUUID().toString();
System.out.println("outputPath: [" + outputPath + "]");
String sql = StringUtils.fileToString("/hive-template/hive-query.sql");
sql = sql.replaceAll("#dbTable#", dbTable);
sql = sql.replaceAll("#conditionClause#", conditionClause);
sql = sql.replaceAll("#outputPath#", outputPath);
System.out.println("sql: [" + sql + "]");
try {
if(hadoopConfProperties != null) {
// hadoop configuration.
Resource resource = new ClassPathResource(hadoopConfProperties);
Properties hadoopProps = PropertiesLoaderUtils.loadProperties(resource);
alterHadoopConfiguration(sqlContext.sparkSession(), hadoopProps);
} else {
Configuration hadoopConfiguration = sqlContext.sparkContext().hadoopConfiguration();
hadoopConfiguration.set("fs.defaultFS", defaultFs);
}
Connection conn = DriverManager.getConnection(hiveJdbcUrl, hiveJdbcUser, hiveJdbcPassword);
Statement statement = conn.createStatement();
statement.executeUpdate(sql);
statement.close();
if(conn != null)
{
conn.close();
}
df = sqlContext.read().format("parquet").load(outputPath);
} catch (Exception e)
{
e.printStackTrace();
}
}
private void alterHadoopConfiguration(SparkSession spark, Properties hadoopProps)
{
Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();
for (String key : hadoopProps.stringPropertyNames()) {
String value = hadoopProps.getProperty(key);
hadoopConfiguration.set(key, value);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment