Skip to content

Instantly share code, notes, and snippets.

@iht
Created August 21, 2022 20:14
Show Gist options
  • Save iht/2a952c9a39934db0688a5e46aef9f43f to your computer and use it in GitHub Desktop.
Save iht/2a952c9a39934db0688a5e46aef9f43f to your computer and use it in GitHub Desktop.
BigQuery JSON file to Beam Schema
package dev.herraiz.beam.schemas;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.googleapis.util.Utils;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.schemas.Schema;
public class JsonSchemaParser {
private static final String ROOT_NODE_PATH = "schema";
public static Schema bqJson2BeamSchema(String schemaAsString) throws Exception {
JsonFactory defaultJsonFactory = Utils.getDefaultJsonFactory();
ObjectMapper mapper = new ObjectMapper();
JsonNode topNode = mapper.readTree(schemaAsString);
JsonNode schemaRootNode = topNode.path(ROOT_NODE_PATH);
if (schemaRootNode.isMissingNode()) {
throw new Exception(
"Is this a BQ schema? The given schema must have a top node of name " + ROOT_NODE_PATH);
}
TableSchema tableSchema =
defaultJsonFactory.fromString(schemaRootNode.toString(), TableSchema.class);
return BigQueryUtils.fromTableSchema(tableSchema);
}
}
@iht
Copy link
Author

iht commented Aug 22, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment