Created
September 11, 2016 18:11
-
-
Save andrewstevenson/e4da573c541e63235047a0219bc7a85d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Update a dataset with a merged avro schema | |
* | |
* @param source_schema The new inbound schema to update the dataset with | |
* @param dataset The dataset to update | |
* @return The updated dataset | |
* */ | |
def update_schema(source_schema: Schema, | |
dataset : Dataset[GenericData.Record]) : Option[Dataset[GenericData.Record]] = { | |
val target_descriptor = dataset.getDescriptor | |
val target_schema = target_descriptor.getSchema | |
if (!Datasets.exists(dataset.getUri)) { | |
log.error(s"Dataset ${dataset.getName} not found") | |
exit | |
None | |
} | |
else { | |
if (source_schema == target_schema) { | |
log.info("No change in schemas detected.") | |
None | |
} | |
else { | |
log.info("Change in schemas detected. Schemas will be merged and dataset updated.") | |
log.debug("Source schema: ${source_schema.toString(true)}") | |
log.debug("sTarget dataset schema: %s".format(target_schema.toString(true))) | |
val new_schema = SchemaUtil.merge(source_schema, target_schema) | |
log.debug(s"New merged schema: ${new_schema.toString(true)}") | |
val updated_descriptor: DatasetDescriptor = new DatasetDescriptor.Builder(target_descriptor) | |
.schema(new_schema) | |
.build() | |
dataset.getDescriptor.getSchema | |
Some(Datasets.update(dataset.getUri, updated_descriptor)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment