-
-
Save collado-mike/d1854958b7b1672f5a494933f80b8b58 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from typing import Optional, List | |
from urllib import parse | |
from openlineage.airflow.extractors import TaskMetadata | |
from openlineage.airflow.extractors.base import BaseExtractor | |
from openlineage.client.run import Dataset | |
log = logging.getLogger(__name__) | |
""" | |
Custom extractors for BigQueryCreateExternalTableOperator and GCSToGCSOperator. | |
Programmatically define extractor with the following: | |
os.environ["OPENLINEAGE_EXTRACTOR_BigQueryCreateExternalTableOperator"] = \ | |
'custom_extractor.BigQueryExternalTableExtractor' | |
os.environ["OPENLINEAGE_EXTRACTOR_GCSToGCSOperator"] = \ | |
'custom_extractor.GcsToGcsExtractor' | |
set_producer("https://github.com/OpenLineage/OpenLineage/tree/0.0.1/integration/airflow") | |
or simply set the environment variables | |
OPENLINEAGE_EXTRACTOR_BigQueryCreateExternalTableOperator=custom_extractor.BigQueryExternalTableExtractor | |
OPENLINEAGE_EXTRACTOR_GCSToGCSOperator=custom_extractor.GcsToGcsExtractor | |
""" | |
def filepath(path): | |
firstchar = '' if path[0] == '/' else '/' | |
return firstchar + path[0:path.rfind('/')] | |
class BigQueryExternalTableExtractor(BaseExtractor): | |
@classmethod | |
def get_operator_classnames(cls) -> List[str]: | |
return ['BigQueryCreateExternalTableOperator'] | |
def extract_on_complete(self, task_instance): | |
tbl = self.operator.table_resource | |
out_table = tbl['tableReference'] | |
out_ds = Dataset(namespace="bigquery", | |
name="{}.{}.{}".format(out_table['projectId'], out_table['datasetId'], out_table['tableId']), | |
facets={}) | |
in_ds = [Dataset(namespace=f'{d.scheme}://{d.hostname}', name=filepath(d.path)) | |
for d in map(lambda str: parse.urlparse(str), tbl['externalDataConfiguration']['sourceUris'])] | |
log.info("Publishing bq task metadata for inputs %s and outputs %s", in_ds, out_ds) | |
return TaskMetadata( | |
task_instance.task_id, | |
inputs=in_ds, | |
outputs=[out_ds] | |
) | |
def extract(self) -> Optional[TaskMetadata]: | |
pass | |
class GcsToGcsExtractor(BaseExtractor): | |
@classmethod | |
def get_operator_classnames(cls) -> List[str]: | |
return ['GCSToGCSOperator'] | |
def extract_on_complete(self, task_instance) -> Optional[TaskMetadata]: | |
inputs = [Dataset(namespace=f'gs://{self.operator.source_bucket}', | |
name=f'{filepath(source)}', | |
facets={}) for source in self.operator.source_objects] | |
output = Dataset(namespace=f'gs://{self.operator.destination_bucket}', | |
name=f'{filepath(self.operator.destination_object)}') | |
log.info("Publishing gcs task metadata for inputs %s and outputs %s", inputs, output) | |
return TaskMetadata(task_instance.task_id, inputs=inputs, outputs=[output]) | |
def extract(self) -> Optional[TaskMetadata]: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment