Skip to content

Instantly share code, notes, and snippets.

@dead
Last active April 23, 2023 17:44
Show Gist options
  • Save dead/4f1b69f9350162d05d6a9bbb9d8ccc54 to your computer and use it in GitHub Desktop.
Save dead/4f1b69f9350162d05d6a9bbb9d8ccc54 to your computer and use it in GitHub Desktop.
Databricks Integration Test Coverage POC

How to use

First you will need to change the testcicd to the name of your project. In the deployment.json you will also need to change the pkg-testcicd to pkg-{your-project-name}. (This was a small hack to avoid dbx to upload the entire project folder)

Then you can run the normal deploy/launch commands to run the integration test.

dbx deploy --jobs=cov-sample-integration-test --files-only
dbx launch --job=cov-sample-integration-test --as-run-submit --trace
databricks fs cp dbfs:/tmp/coverage.xml .

With this you should be able to read the coverage.xml in your current folder. If you want to integrate this with some tool I think you will need to change the file paths in the xml.

{
"default": {
"jobs": [
{
"name": "cov-sample-integration-test",
"existing_cluster_id": "", // change-me
"email_notifications": {
"on_start": [],
"on_success": [],
"on_failure": []
},
"max_retries": 0,
"libraries": [
{
"pypi": {
"package": "pytest-cov"
}
}
],
"spark_python_task": {
"python_file": "tests/integration/sample_test.py",
"parameters": [
"--cov",
"pkg-testcicd",
"--cov-config",
".coveragerc",
"--xml",
"/dbfs/tmp/coverage.xml"
]
}
}
]
}
}
import unittest
# probably you will need to change this import
from testcicd.jobs.sample.entrypoint import SampleJob
from uuid import uuid4
from pyspark.dbutils import DBUtils # noqa
from typing import Dict
import coverage
from argparse import ArgumentParser
import sys
class SampleJobIntegrationTest(unittest.TestCase):
def setUp(self):
self.test_dir = "dbfs:/tmp/tests/sample/%s" % str(uuid4())
self.test_config = {"output_format": "delta",
"output_path": self.test_dir}
self.job = SampleJob(init_conf=self.test_config)
self.dbutils = DBUtils(self.job.spark)
self.spark = self.job.spark
def test_sample(self):
self.job.launch()
output_count = (
self.spark.read.format(self.test_config["output_format"])
.load(self.test_config["output_path"])
.count()
)
self.assertGreater(output_count, 0)
def tearDown(self):
self.dbutils.fs.rm(self.test_dir, True)
class CoverageIntegrationTest():
def __init__(self):
self.conf = self._parse_args()
if self.conf['cov']:
self.cov = coverage.Coverage(config_file=self.conf['cov_config'])
@staticmethod
def _dbfs_path(path):
if path:
return path.replace('dbfs:', '/dbfs')
@staticmethod
def _parse_args() -> Dict[str, str]:
p = ArgumentParser()
p.add_argument("--cov", required=False, type=str)
p.add_argument("--cov-config", required='--cov' in sys.argv, type=str)
p.add_argument("--xml", required='--cov' in sys.argv, type=str)
namespace = p.parse_known_args(sys.argv[1:])[0]
return {
"cov": namespace.cov.replace('pkg-', '') if namespace.cov else None,
"cov_config": CoverageIntegrationTest._dbfs_path(namespace.cov_config),
"xml": namespace.xml
}
def start(self):
if not self.cov:
return
self.cov.start()
def stop(self):
if not self.cov:
return
self.cov.stop()
if self.conf['xml']:
self.cov.xml_report(outfile=self.conf['xml'])
if __name__ == "__main__":
c = CoverageIntegrationTest()
c.start()
# please don't change the logic of test result checks here
# it's intentionally done in this way to comply with jobs run result checks
# for other tests, please simply replace the SampleJobIntegrationTest with your custom class name
loader = unittest.TestLoader()
tests = loader.loadTestsFromTestCase(SampleJobIntegrationTest)
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(tests)
c.stop()
if not result.wasSuccessful():
raise RuntimeError(
"One or multiple tests failed. Please check job logs for additional information."
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment