Skip to content

Instantly share code, notes, and snippets.

@rnbtechnology
Last active January 7, 2021 23:29
Show Gist options
  • Save rnbtechnology/c04904dd73cb9e88ac31a8708f9e33c4 to your computer and use it in GitHub Desktop.
Save rnbtechnology/c04904dd73cb9e88ac31a8708f9e33c4 to your computer and use it in GitHub Desktop.
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.functions import col
# instantiates a Scala Map containing configurations for communicating with Schema Regsitry APIs
def get_schema_registry_conf_map(spark, schema_registry_url, topic_name):
sc = spark.SparkContext
jvm_gateway = sc._gateway.jvm
schema_registry_config_dict = {
"schema.registry.url": schema_registry_url,
"schema.registry.topic": topic_name,
"value.schema.id": "latest",
"value.schema.naming.strategy": "topic.name"
}
conf_map = getattr(
getattr(jvm_gateway.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$"
)
for k, v in schema_registry_config_dict.items():
conf_map = getattr(conf_map, "$plus")(jvm_gateway.scala.Tuple2(k, v))
return conf_map
# returns deserialized column (using Schema Registry)
def from_avro(col, conf_map):
jvm_gateway = SparkContext._active_spark_context._gateway.jvm
abris_avro = jvm_gateway.za.co.absa.abris.avro
return Column(
abris_avro.functions.from_confluent_avro(_to_java_column(col), conf_map)
)
TOPIC_NAME = "topic_name"
SCHEMA_REGISTRY_URL = "host1:port1,host2:port2"
# instantiate Scala Map for communicating with Schema Regsitry APIs
conf_map = get_schema_registry_conf_map(spark, SCHEMA_REGISTRY_URL, TOPIC_NAME)
# deserialize column containing data (using Schema Registry) and select pertinent columns for processing and
deserialized_df = df.select(
col("key").cast("string"),
col("partition"),
col("offset"),
col("timestamp"),
col("timestampType"),
from_avro(df.value, conf_map).alias("value")
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment