Skip to content

Instantly share code, notes, and snippets.

@albinkjellin
Created November 22, 2023 16:21
Show Gist options
  • Save albinkjellin/7cd8a64e3c3b874032ed001d44c7ab4c to your computer and use it in GitHub Desktop.
Save albinkjellin/7cd8a64e3c3b874032ed001d44c7ab4c to your computer and use it in GitHub Desktop.
Sample timeliness Soda Spark
from pyspark.sql import DataFrame, SparkSession, SQLContext, types
from pyspark.sql.functions import *
from soda.scan import Scan
import yaml
spark_session = SparkSession.builder.config("spark.jars", "postgresql-42.3.1.jar").getOrCreate()
file_path = 'stock-price-1.csv'
file_name = 'stockprice1'
df_stock = spark_session.read.option("header",True).csv(file_path)
df_stock = df_stock.withColumn("date",to_timestamp("date"))
df_stock.createOrReplaceTempView(file_name)
print(df_stock.printSchema())
scan = Scan()
scan.set_scan_definition_name(file_name)
scan.set_data_source_name('spark_df')
scan.add_spark_session(spark_session)
scan.add_configuration_yaml_file(file_path="configuration.yml")
checks_dict = {
'checks for '+file_name: [
{
'row_count > 0':
{'name': 'Row count'}
},
{
'freshness(date) < 1d':
{'name': 'Date freshness'}
}
]
}
print(yaml.dump(checks_dict))
scan.add_sodacl_yaml_str(yaml.dump(checks_dict))
scan.execute()
print(scan.get_logs_text())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment