Skip to content

Instantly share code, notes, and snippets.

@collado-mike
Last active April 2, 2022 00:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save collado-mike/d1854958b7b1672f5a494933f80b8b58 to your computer and use it in GitHub Desktop.
Save collado-mike/d1854958b7b1672f5a494933f80b8b58 to your computer and use it in GitHub Desktop.
import logging
from typing import Optional, List
from urllib import parse
from openlineage.airflow.extractors import TaskMetadata
from openlineage.airflow.extractors.base import BaseExtractor
from openlineage.client.run import Dataset
log = logging.getLogger(__name__)
"""
Custom extractors for BigQueryCreateExternalTableOperator and GCSToGCSOperator.
Programmatically define extractor with the following:
os.environ["OPENLINEAGE_EXTRACTOR_BigQueryCreateExternalTableOperator"] = \
'custom_extractor.BigQueryExternalTableExtractor'
os.environ["OPENLINEAGE_EXTRACTOR_GCSToGCSOperator"] = \
'custom_extractor.GcsToGcsExtractor'
set_producer("https://github.com/OpenLineage/OpenLineage/tree/0.0.1/integration/airflow")
or simply set the environment variables
OPENLINEAGE_EXTRACTOR_BigQueryCreateExternalTableOperator=custom_extractor.BigQueryExternalTableExtractor
OPENLINEAGE_EXTRACTOR_GCSToGCSOperator=custom_extractor.GcsToGcsExtractor
"""
def filepath(path):
firstchar = '' if path[0] == '/' else '/'
return firstchar + path[0:path.rfind('/')]
class BigQueryExternalTableExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['BigQueryCreateExternalTableOperator']
def extract_on_complete(self, task_instance):
tbl = self.operator.table_resource
out_table = tbl['tableReference']
out_ds = Dataset(namespace="bigquery",
name="{}.{}.{}".format(out_table['projectId'], out_table['datasetId'], out_table['tableId']),
facets={})
in_ds = [Dataset(namespace=f'{d.scheme}://{d.hostname}', name=filepath(d.path))
for d in map(lambda str: parse.urlparse(str), tbl['externalDataConfiguration']['sourceUris'])]
log.info("Publishing bq task metadata for inputs %s and outputs %s", in_ds, out_ds)
return TaskMetadata(
task_instance.task_id,
inputs=in_ds,
outputs=[out_ds]
)
def extract(self) -> Optional[TaskMetadata]:
pass
class GcsToGcsExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['GCSToGCSOperator']
def extract_on_complete(self, task_instance) -> Optional[TaskMetadata]:
inputs = [Dataset(namespace=f'gs://{self.operator.source_bucket}',
name=f'{filepath(source)}',
facets={}) for source in self.operator.source_objects]
output = Dataset(namespace=f'gs://{self.operator.destination_bucket}',
name=f'{filepath(self.operator.destination_object)}')
log.info("Publishing gcs task metadata for inputs %s and outputs %s", inputs, output)
return TaskMetadata(task_instance.task_id, inputs=inputs, outputs=[output])
def extract(self) -> Optional[TaskMetadata]:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment