Skip to content

Instantly share code, notes, and snippets.

View pritampan's full-sized avatar
🏠
Working from home

Pritam pritampan

🏠
Working from home
View GitHub Profile
--- !RTFMetadata
entity_name_product : <redacted>.intl_dk
snowflake_table : $snowflake_db.<redacted>.intl_dk
snowflake_select_condition : |
select b.uuid as user_uuid, count(1) as row_count from $snowflake_db.<redacted>.intl_dk a
join $snowflake_db.<redacted>.EMEA_USER b on trim(a.requester_email) = trim(b.login_email)
where trim(login_email) <> 'gdpr_forgotten'
and b.uuid in ($rtf_users) and a.country_key in (4, 35)
group by b.uuid
union all
[DEFAULT]
pipeline_name = wf_rtf
retention = 0.0001
;below variables are dynamically read by spark-driver from global_config.ini
;s3_bucket = s3://bucket-id_env
;s3_pipeline_bucket= s3://bucket-id_env/pipeline
rtf_pipeline_path = ${s3_pipeline_bucket}/${pipeline_name}
gdpr_rtf_requests_incr = ${rtf_pipeline_path}/working/gdpr/rtf_user_requests_incr
gdpr_rtf_requests_backup = ${rtf_pipeline_path}/backup/rtf_user_requests_incr
gdpr_rtf_requests_processed = ${s3_bucket}/target/bastlevel_low/gdpr/rtf_user_requests
@dataclass(order=True)
class RTFMetadata(yaml.YAMLObject):
"""This is a :class:`dataclass` to represent RTF Metadata as a Python Object
:param yaml: [description]
:type yaml: [type]
"""
yaml_tag = "!RTFMetadata"
entity_name_product: str
"""
* Author: Pritam
* Sources: gdpr.rtf_user_requests, and tables configured in RTF metadata
* Targets: GDPR Level High tables as configured in RTF metadata
* Airflow DAG:
* Airflow Task:
* Change log:
* Jira: Description: Date
--- !RTFMetadata
entity_name_product : dwh.emea_dim_user
snowflake_table : $snowflake_db.PROD.USER
snowflake_select_condition : |
select uuid as user_uuid, count(*) as row_count
from $snowflake_db.PROD.USER
where uuid in ($rtf_users)
group by uuid
snowflake_update_condition : |
update
import asyncio
from abc import ABC, abstractmethod
from dependencies.retry import retry_call
from dependencies.core_service import CoreService
from dependencies.messagebus.mbus_producer import MBusProducer
from dependencies.rtf_helper_functions import (
clock,
write_string_to_s3_in_txt_object,
from abc import ABC, abstractmethod
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from dependencies.rtf_helper_functions import RTFMetadata
class CoreService(ABC):
"""This is the core abstract service which needs specific implementation for specific service like S3 / snowflake or else