Skip to content

Instantly share code, notes, and snippets.

@andrewstevenson
Created September 11, 2016 17:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewstevenson/a6a0567d9b35bbff69c44fca1e56d7aa to your computer and use it in GitHub Desktop.
Save andrewstevenson/a6a0567d9b35bbff69c44fca1e56d7aa to your computer and use it in GitHub Desktop.
val new_schema = SchemaUtil.merge(source_schema, target_schema)
/**
* Create a hive dataset in the repo
*
* @param repo_root The root location of the repo
* @param database The database to create the dataset(table) in.
* @param name The dataset/table name
* */
def create_hive_dataset(repo_root: String,
database: String,
name: String,
schema: Schema) : Dataset[GenericData.Record] = {
val repo : DatasetRepository = DatasetRepositories.repositoryFor("repo:hive:" + repo_root)
if (!repo.exists(database, name)) {
log.info("Creating dataset at %s with schema "
.format(repo_root + "/" + database + "/" + name, schema.toString(true)))
repo.create(database, name, new DatasetDescriptor.Builder().format(Formats.PARQUET).schema(schema).build)
repo.load(database, name).asInstanceOf[Dataset[GenericData.Record]]
} else {
log.info("Dataset at %s already exists.".format(repo_root + "/" + database + "/" + name))
val dataset = repo.load(database, name).asInstanceOf[Dataset[GenericData.Record]]
val updated = update_schema(schema, dataset)
updated match {
case None => dataset
case _ => updated.get
}
}
}
/**
* Update a dataset with a merged avro schema
*
* @param source_schema The new inbound schema to update the dataset with
* @param dataset The dataset to update
* */
def update_schema(source_schema: Schema,
dataset : Dataset[GenericData.Record]) : Option[Dataset[GenericData.Record]] = {
val target_descriptor = dataset.getDescriptor
val target_schema = target_descriptor.getSchema
if (!Datasets.exists(dataset.getUri)) {
log.error("Dataset %s not found".format(dataset.getName))
exit
None
}
else {
if (source_schema == target_schema) {
log.info("No change in schemas detected.")
None
}
else {
log.info("Change in schemas detected. Schemas will be merged and dataset updated.")
log.debug("Source schema: ".format(source_schema.toString(true)))
log.debug("Target dataset schema: ".format(target_schema.toString(true)))
val new_schema = SchemaUtil.merge(source_schema, target_schema)
log.debug("New merged schema: ".format(new_schema.toString(true)))
val updated_descriptor: DatasetDescriptor = new DatasetDescriptor.Builder(target_descriptor)
.schema(new_schema)
.build()
Some(Datasets.update(dataset.getUri, updated_descriptor))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment