Skip to content

Instantly share code, notes, and snippets.

@andrewstevenson
Created September 11, 2016 18:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewstevenson/e4da573c541e63235047a0219bc7a85d to your computer and use it in GitHub Desktop.
Save andrewstevenson/e4da573c541e63235047a0219bc7a85d to your computer and use it in GitHub Desktop.
/**
* Update a dataset with a merged avro schema
*
* @param source_schema The new inbound schema to update the dataset with
* @param dataset The dataset to update
* @return The updated dataset
* */
def update_schema(source_schema: Schema,
dataset : Dataset[GenericData.Record]) : Option[Dataset[GenericData.Record]] = {
val target_descriptor = dataset.getDescriptor
val target_schema = target_descriptor.getSchema
if (!Datasets.exists(dataset.getUri)) {
log.error(s"Dataset ${dataset.getName} not found")
exit
None
}
else {
if (source_schema == target_schema) {
log.info("No change in schemas detected.")
None
}
else {
log.info("Change in schemas detected. Schemas will be merged and dataset updated.")
log.debug("Source schema: ${source_schema.toString(true)}")
log.debug("sTarget dataset schema: %s".format(target_schema.toString(true)))
val new_schema = SchemaUtil.merge(source_schema, target_schema)
log.debug(s"New merged schema: ${new_schema.toString(true)}")
val updated_descriptor: DatasetDescriptor = new DatasetDescriptor.Builder(target_descriptor)
.schema(new_schema)
.build()
dataset.getDescriptor.getSchema
Some(Datasets.update(dataset.getUri, updated_descriptor))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment