Skip to content

Instantly share code, notes, and snippets.

View tameem92's full-sized avatar
💻
Building stuff.

Tameem Iftikhar tameem92

💻
Building stuff.
View GitHub Profile
CONFIG = [
{
'id': 1,
'name': 'background',
'directory': 'Background',
'required': True,
'rarity_weights': None,
},
{
'id': 2,
VERSION='v1.0.3'
# DAG Factory
factory = DAGFactory(config.environment)
process_invoices_dag = factory.create('ProcessInvoices', config.process_invoices).build()
process_messages_dag = factory.create('ProcessMessages', config.process_messages).build()
def create_main_dag(dag_id, org_config):
.
├── projects
│   ├── dev
│   │   ├── config
│   │   └── dags
│   │   ├── process_invoices.py
│   │   └── process_messages.py
│   └── prod
│   ├── configs
│   └── dags
from airflow.models import Variable
# Fetch your basic airflow variables
# These are loaded by CI/CD or could be from a backend like secret manager
# in AWS or Google Cloud
environment = Variable.get('environment', deserialize_json=True)
controller = Variable.get('controller', deserialize_json=True)
# Fetch dynamic configs from some API (whatever you use)
├── README.md
├── dag_factory
│   ├── dag_factory.py
│   ├── process_invoices.py
│   └── process_messages.py
└── projects
├── dev
│   ├── config
│   └── dags
│   ├── config.py
from datetime import datetime, timedelta
import os
from airflow.models import DAG, Variable
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
# Custom slack notifications
from utils.slack import task_fail_slack_alert
from utils.slack import pipeline_success_slack_alert
from dag_factory.process_invoices import ProcessInvoices
from dag_factory.process_messages import ProcessMessages
class DAGFactory():
"""The DAG Factory Class"""
def __init__(self, environment_config=None):
self.environment_config = environment_config
def create(self, dag_type, config: list):
VERSION='v1.0.3'
# DAG Factory
factory = DAGFactory(config.environment)
process_invoices_dag = factory.create('ProcessInvoices', config.process_invoices).build()
process_messages_dag = factory.create('ProcessMessages', config.process_messages).build()
with DAG(dag_id=f'main_{VERSION}',
default_args=default_args,
for x in range(1, 500):
# This is our best attempt to prevent a deadlock
# running multiple parallel tests.
time.sleep(0.5)
runTest(x)
"""
Many Qt functions are not threadsafe. If you use function callback,
even if you put locks around all of YOUR draw calls, you will segfault
because Qt's main event loop is still running and using
resources without locks.
"""
from multiprocessing.pool import ThreadPool
import sys
from threading import Lock