Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created February 1, 2022 09:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/2c6e1860555d6d8773f54926cd4a3637 to your computer and use it in GitHub Desktop.
Save anna-geller/2c6e1860555d6d8773f54926cd4a3637 to your computer and use it in GitHub Desktop.
from prefect import Flow, Parameter, task, Task
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.engine.results import S3Result
from datetime import datetime
from prefect.executors import LocalDaskExecutor
from dateutil.relativedelta import relativedelta
CID = "c022"
FLOW_NAME = "europe-monthly-lotame-datamart"
REGION = "eu-west-1"
CLUSTER = "aqfer-prod-eks-Ireland"
TASK_TAGS = ["c022-europe-monthly-lotame-datamart"]
LOCATION = "results/" + CID + "/" + FLOW_NAME + "/" + "{task_run_id}.prefect"
S3_RESULT = S3Result(bucket="com.aqfer.prod.prefect", location=LOCATION)
@task
def reduce_map():
pass
@task(result=S3_RESULT)
def compute_input_args(event_month):
format = "%Y%m"
date_format = "%Y%m%d"
if event_month == None:
date = datetime.now() + relativedelta(months=-1)
else:
date = datetime.strptime(event_month, format)
months = []
cg_dates = []
for x in range(5):
m = date + relativedelta(months=-x)
months += [m.strftime(format)]
for y in range(1, 15):
c = date.replace(day=1) + relativedelta(days=-y)
cg_dates += [c.strftime(date_format)]
cg_dates.append(months[0] + "*")
for z in range(14):
c = date.replace(day=1) + relativedelta(months=1) + relativedelta(days=z)
cg_dates += [c.strftime(date_format)]
return {
"event_month": months[0],
"prev_five_months": "{{{}}}".format(",".join(months)),
"cg_dates": "{{{}}}".format(",".join(cg_dates)),
"prev_three_months": "{{{}}}".format(",".join(months[0:3])),
}
@task(result=S3_RESULT)
def generate_arg_map(event_month, prev_three_months):
m = []
for c in [
"AT",
"BE",
"CH",
"CZ",
"DE",
"DK",
"ES",
"FI",
"FR",
"GB",
"GR",
"HU",
"IE",
"IT",
"NL",
"NO",
"PL",
"PT",
"RO",
"SE",
"SK",
]:
m += [
{
"event_month": event_month,
"country": c,
"prev_three_months": prev_three_months,
}
]
return m
class LakeviewRunJobTask(Task):
def __init__(
self,
cid=None,
job=None,
cluster=None,
poll_interval=None,
name=None,
timeout=None,
skip=False,
concurrent=False,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.cid = cid
self.job = job
self.cluster = cluster
self.poll_interval = poll_interval
self.name = name
self.timeout = timeout
self.skip = skip
self.concurrent = concurrent
def run(self) -> int:
pass
collate_monthly_crossdevice = LakeviewRunJobTask(
cid=CID,
job="europe-collate-monthly-crossdevice",
cluster=CLUSTER,
poll_interval=300,
name="collate-monthly-crossdevice",
timeout=10800,
result=S3_RESULT,
skip=True,
)
collate_quarterly_crossdevice = LakeviewRunJobTask(
cid=CID,
job="europe-collate-quarterly-crossdevice",
cluster=CLUSTER,
poll_interval=300,
name="collate-quarterly-crossdevice",
timeout=10800,
result=S3_RESULT,
skip=True,
)
import_adform_lotame = LakeviewRunJobTask(
cid=CID,
job="europe-import-adform-lotame",
cluster=CLUSTER,
poll_interval=300,
name="import-adform-lotame",
timeout=10800,
result=S3_RESULT,
skip=True,
)
collate_monthly_lotame = LakeviewRunJobTask(
cid=CID,
job="europe-collate-monthly-lotame",
cluster=CLUSTER,
poll_interval=300,
concurrent=True,
name="collate-monthly-lotame",
timeout=10800,
result=S3_RESULT,
task_run_name="collate-monthly-lotame-{args[country]}",
skip=True,
)
collate_quarterly_lotame_cookie = LakeviewRunJobTask(
cid=CID,
job="europe-collate-quarterly-lotcookie",
cluster=CLUSTER,
poll_interval=300,
concurrent=True,
name="collate-quarterly-lotame-cookie",
timeout=10800,
result=S3_RESULT,
task_run_name="collate-quarterly-lotame-cookie-{args[country]}",
skip=True,
)
collate_quarterly_lotame_mobile = LakeviewRunJobTask(
cid=CID,
job="europe-collate-quarterly-lotmobile",
cluster=CLUSTER,
poll_interval=300,
concurrent=True,
name="collate-quarterly-lotame-mobile",
timeout=10800,
result=S3_RESULT,
task_run_name="collate-quarterly-lotame-mobile-{args[country]}",
skip=True,
)
collate_quarterly_crdlotcookie = LakeviewRunJobTask(
cid=CID,
job="europe-collate-quarterly-crdlotcookie",
cluster=CLUSTER,
poll_interval=300,
concurrent=True,
name="collate-quarterly-crdlotcookie",
timeout=18000,
result=S3_RESULT,
task_run_name="collate-quarterly-crdlotcookie-{args[country]}",
)
collate_quarterly_crdlotmobile = LakeviewRunJobTask(
cid=CID,
job="europe-collate-quarterly-crdlotmobile",
cluster=CLUSTER,
poll_interval=300,
concurrent=True,
name="collate-quarterly-crdlotmobile",
timeout=18000,
result=S3_RESULT,
task_run_name="collate-quarterly-crdlotmobile-{args[country]}",
)
with Flow(FLOW_NAME) as flow:
event_month = Parameter("event_month", default=None)
a = compute_input_args(event_month)
t1 = collate_monthly_crossdevice(args=a)
t2 = collate_quarterly_crossdevice(args=a, upstream_tasks=[t1])
t3 = import_adform_lotame(args=a)
arg_map = generate_arg_map(a["event_month"], a["prev_three_months"])
t4 = collate_monthly_lotame.map(args=arg_map)
r1 = reduce_map(upstream_tasks=[t4])
t5 = collate_quarterly_lotame_cookie.map(args=arg_map)
t5.set_upstream(r1)
r2 = reduce_map(upstream_tasks=[t5])
t6 = collate_quarterly_lotame_mobile.map(args=arg_map)
t6.set_upstream(r2)
r3 = reduce_map(upstream_tasks=[t6])
t7 = collate_quarterly_crdlotcookie.map(args=arg_map)
t8 = collate_quarterly_crdlotmobile.map(args=arg_map)
t7.set_upstream(r3)
t8.set_upstream(r3)
flow.storage = GitHub(
repo="aqfer/product-deployments",
path="datalake/cids/{}/flows/{}.py".format(CID, FLOW_NAME),
access_token_secret="GITHUB_ACCESS_TOKEN",
)
flow.run_config = KubernetesRun(labels=[REGION],)
flow.executor = LocalDaskExecutor()
flow.visualize()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment