Skip to content

Instantly share code, notes, and snippets.

@mykidong
Last active April 24, 2020 06:33
Show Gist options
  • Save mykidong/674fb1913c81645ea04c35a6460ff37c to your computer and use it in GitHub Desktop.
Save mykidong/674fb1913c81645ea04c35a6460ff37c to your computer and use it in GitHub Desktop.
JdbcHiveRelation.class
public class JdbcHiveRelation extends BaseRelation implements Serializable, TableScan {
private SQLContext sqlContext;
private StructType schema;
private java.util.Map<String, String> parametersAsJava;
private java.util.Map<String, HiveMetaResolver.HiveMetadata> hiveMetadataMap;
public JdbcHiveRelation(SQLContext sqlContext, Map<String, String> parameters)
{
this.sqlContext = sqlContext;
this.parametersAsJava = mapAsJavaMapConverter(parameters).asJava();
}
@Override
public SQLContext sqlContext() {
return this.sqlContext;
}
@Override
public StructType schema() {
if(this.schema == null)
{
this.buildSchema();
}
return this.schema;
}
private void buildSchema()
{
String dbTable = parametersAsJava.get(JdbcHiveOptions.dbTable);
String hiveJdbcUrl = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUrl);
String hiveJdbcUser = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUser);
String hiveJdbcPassword = parametersAsJava.get(JdbcHiveOptions.hiveJdbcPassword);
String hiveMetastoreUrl = parametersAsJava.get(JdbcHiveOptions.hiveMetastoreUrl);
String hiveMetastoreUser = parametersAsJava.get(JdbcHiveOptions.hiveMetastoreUser);
String hiveMetastorePassword = parametersAsJava.get(JdbcHiveOptions.hiveMetastorePassword);
HiveMetaResolver hiveMetaResolver = new HiveMetaResolver(dbTable,
hiveJdbcUrl,
hiveJdbcUser,
hiveJdbcPassword,
hiveMetastoreUrl,
hiveMetastoreUser,
hiveMetastorePassword);
this.schema = hiveMetaResolver.getSparkSchema();
this.hiveMetadataMap = hiveMetaResolver.getHiveMetadataMap();
}
@Override
public RDD<Row> buildScan() {
schema();
String dbTable = parametersAsJava.get(JdbcHiveOptions.dbTable);
String conditionClause = parametersAsJava.get(JdbcHiveOptions.conditionClause);
String hiveJdbcUrl = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUrl);
String hiveJdbcUser = parametersAsJava.get(JdbcHiveOptions.hiveJdbcUser);
String hiveJdbcPassword = parametersAsJava.get(JdbcHiveOptions.hiveJdbcPassword);
int fetchSize = Integer.parseInt(parametersAsJava.get(JdbcHiveOptions.fetchsize));
String tempPath = parametersAsJava.get(JdbcHiveOptions.tempPath);
tempPath = (tempPath != null) ? tempPath : "/jdbc-hive-temp";
String query = "select * from " + dbTable + " " + conditionClause;
ObjectMapper mapper = new ObjectMapper();
Properties properties = new Properties();
properties.setProperty("user", hiveJdbcUser);
properties.setProperty("password", hiveJdbcPassword);
String filePath = tempPath + "/" + UUID.randomUUID().toString() + "-" + UUID.randomUUID().toString();
System.out.println("temp file path: [" + filePath + "]");
Connection connection = null;
try {
connection = DriverManager.getConnection(hiveJdbcUrl, properties);
PreparedStatement statement = connection.prepareStatement(query);
statement.setFetchSize(fetchSize);
ResultSet rs = statement.executeQuery();
long count = 0;
JavaRDD<Row> emptyRdd = new JavaSparkContext(sqlContext.sparkContext()).emptyRDD();
Dataset<Row> df = sqlContext.createDataFrame(emptyRdd, schema);
String dfJson = df.schema().json();
StructField[] structFieldList = schema.fields();
List<Row> rowList = new ArrayList<>();
while (rs.next()) {
List<Object> rowElements = new ArrayList<>();
for(StructField structField : structFieldList)
{
String columnName = structField.name();
DataType dataType = structField.dataType();
HiveMetaResolver.HiveMetadata hiveMetadata = hiveMetadataMap.get(columnName);
int sqlType = hiveMetadata.getDataType();
int precision = hiveMetadata.getFieldSize();
int scale = hiveMetadata.getFieldScale();
boolean signed = hiveMetadata.isSigned();
Dataset<Row> columnDf = df.select(functions.col(columnName));
StructType columnSchema = columnDf.schema();
String columnSchemaJson = columnSchema.json();
java.util.Map<String, Object> columnSchemaMap = JsonUtils.toMap(mapper, columnSchemaJson);
Object tempResult = null;
// retrieve value from ResultSet considering spark schema.
...
}
Row finalRow = RowFactory.create(rowElements.toArray(new Object[0]));
rowList.add(finalRow);
if(count % fetchSize == 0)
{
if(count > 0) {
System.out.println("fetch count: [" + count + "]");
appendFetchedData(sqlContext.sparkSession(), rowList, schema, filePath);
// rowList refresh.
rowList = new ArrayList<>();
}
}
count++;
}
if(rowList.size() > 0)
{
System.out.println("row list remained: [" + rowList.size() + "]");
appendFetchedData(sqlContext.sparkSession(), rowList, schema, filePath);
}
statement.close();
if(connection != null)
{
connection.close();
}
} catch (Exception e)
{
e.printStackTrace();
throw new RuntimeException(e.getCause());
}
Dataset<Row> df = sqlContext.read().format("parquet").load(filePath);
return df.rdd();
}
private void appendFetchedData(SparkSession spark, List<Row> rowList, StructType schema, String filePath)
{
if(rowList.size() > 0) {
spark.createDataFrame(rowList, schema).write()
.format("parquet")
.mode(SaveMode.Append)
.save(filePath);
System.out.println("row list saved to the temp file: [" + filePath + "]");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment