Skip to content

Instantly share code, notes, and snippets.

@mik-laj
Created February 24, 2020 17:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mik-laj/2f41e3e66b4331b8d79d90912e4e7c81 to your computer and use it in GitHub Desktop.
Save mik-laj/2f41e3e66b4331b8d79d90912e4e7c81 to your computer and use it in GitHub Desktop.
import functools
import logging
import time
import sys
from airflow.configuration import conf
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.state import State
sys.path.pop(0)
sys.path.insert(0, '')
print(sys.path)
#
#
# class CountQueries(object):
# def __init__(self):
# self.count = 0
#
# def __enter__(self):
# from sqlalchemy import event
# from airflow.settings import engine
# event.listen(engine, "after_cursor_execute", self.after_cursor_execute)
# return None
#
# def after_cursor_execute(self, *args, **kwargs):
# self.count += 1
#
# def __exit__(self, type, value, traceback):
# from sqlalchemy import event
# from airflow.settings import engine
# event.remove(engine, "after_cursor_execute", self.after_cursor_execute)
# print('Query count: ', self.count)
#
#
# count_queries = CountQueries
import contextlib
import traceback
@contextlib.contextmanager
def trace_queries():
from sqlalchemy import event
from airflow.settings import engine
log = logging.getLogger("sql")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault('query_start_time', []).append(time.time())
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info['query_start_time'].pop()
file_names = [
f"{f.filename}:{f.name}:{f.lineno}"
for f in traceback.extract_stack()
if 'sqlalchemy' not in f.filename
]
file_name = file_names[-1] if file_names else ""
stack = [
f
for f in traceback.extract_stack()
if 'sqlalchemy' not in f.filename
]
stack_info = " > ".join([f"{f.filename.rpartition('/')[-1]}:{f.name}:{f.lineno}" for f in stack][-7:])
conn.info.setdefault('query_start_time', []).append(time.monotonic())
log.info(
"%.5f | %s | %s | %s",
total, file_name, stack_info,
""
# statement.replace("\n", " ")
)
event.listen(engine, "before_cursor_execute", before_cursor_execute)
event.listen(engine, "after_cursor_execute", after_cursor_execute)
try:
yield
finally:
event.remove(engine, "before_cursor_execute", before_cursor_execute)
event.remove(engine, "after_cursor_execute", after_cursor_execute)
DAG_FILE = "/files/dags/50_dag_5_dummy_tasks.py"
log = logging.getLogger("airflow.processor")
from airflow.jobs.scheduler_job import DagFileProcessor, SchedulerJob
processor = DagFileProcessor([], log)
@contextlib.contextmanager
def pyspy():
import os
import datetime
import signal
pid = str(os.getpid())
suffix = datetime.datetime.now().isoformat()
filename = f'/files/pyspy/flame-{suffix}-{pid}.html'
pyspy_pid = os.spawnlp(
os.P_NOWAIT, 'sudo', 'sudo', 'py-spy', 'record', '--idle', '-o', filename, '-p', pid
)
try:
yield
finally:
os.kill(pyspy_pid, signal.SIGINT)
def benchmark(retry_count=5):
def benchmark_dec(f):
@functools.wraps(f)
def wrap(*args, **kwargs):
r = []
for i in range(retry_count):
time1 = time.time()
f(*args, **kwargs)
time2 = time.time()
diff = (time2 - time1) * 1000.0
r.append(diff)
# print('Retry %d took %0.3f ms' % (i, diff))
print('Average took %0.3f ms' % (sum(r) / retry_count))
return wrap
return benchmark_dec
def retry(retry_count=5):
def retry_dec(f):
@functools.wraps(f)
def wrap(*args, **kwargs):
for i in range(retry_count):
f(*args, **kwargs)
return wrap
return retry_dec
def timing(retry_count=1):
def timing_dec(fn):
@functools.wraps(fn)
def wrap(*args, **kwargs):
r = []
time1 = time.time()
fn(*args, **kwargs)
time2 = time.time()
diff = (time2 - time1) * 1000.0
r.append(diff)
print('Retry took %0.3f ms' % (diff / retry_count))
return wrap
return timing_dec
def count_queries(fn):
@functools.wraps(fn)
def wrap(*args, **kwargs):
fn.query_count = 0
def after_cursor_execute(*args, **kwargs):
fn.query_count += 1
from sqlalchemy import event
from airflow.settings import engine
event.listen(engine, "after_cursor_execute", after_cursor_execute)
try:
return fn(*args, **kwargs)
finally:
print("Query count: ", fn.query_count)
event.remove(engine, "after_cursor_execute", after_cursor_execute)
return wrap
@contextlib.contextmanager
def profiled(print_callers=False):
import cProfile
import io
import pstats
pr = cProfile.Profile()
pr.enable()
yield
pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
if print_callers:
ps.print_callers()
else:
ps.print_stats()
print(s.getvalue())
RETRY = 10
def file_processing():
from airflow.utils.session import create_session
with create_session() as session:
from airflow.models import DagModel
(
session.query(DagModel)
.filter(DagModel.is_paused.is_(True))
.update({"is_paused": False}, synchronize_session=False)
)
session.commit()
from airflow.models import TaskInstance
session.query(TaskInstance).delete()
from airflow.models import DagRun
session.query(DagRun).delete()
@timing(RETRY)
@retry(RETRY)
@timing()
@count_queries
def slow_case():
# for _ in range(100):
# conf.getboolean('scheduler', 'USE_JOB_SCHEDULE')
# with trace_queries():
# with pyspy():
from airflow.models import DagRun
session.query(DagRun).delete()
from airflow.models import TaskInstance
session.query(TaskInstance).delete()
session.commit()
# with trace_queries():
processor.process_file(DAG_FILE, None, pickle_dags=False)
slow_case()
file_processing()
# def scheduler_job():
# from airflow.utils.session import create_session
# with create_session() as session:
# from airflow.models import DagModel
# (
# session.query(DagModel)
# .filter(DagModel.is_paused.is_(True))
# .update({"is_paused": False}, synchronize_session=False)
# )
# session.commit()
# from airflow.models import TaskInstance
# session.query(TaskInstance).delete()
# from airflow.models import DagRun
# session.query(DagRun).delete()
# from airflow.models import DagTag
# session.query(DagTag).delete()
# from airflow.models import DagModel
# session.query(DagModel).delete()
# from airflow.utils.db import add_default_pool_if_not_exists
# add_default_pool_if_not_exists()
# session.commit()
#
# # @timing(RETRY)
# # @retry(RETRY)
# @timing()
# # @count_queries
# def slow_case():
# print("Start Scheduler")
# from airflow.models import DagRun
# session.query(DagRun).delete()
# from airflow.models import TaskInstance
# session.query(TaskInstance).delete()
# from tests.test_utils.mock_executor import MockExecutor
#
# executor = MockExecutor()
# # with trace_queries():
# SchedulerJob(subdir=DAG_FILE, do_pickle=False, num_runs=3, executor=executor).run()
#
# slow_case()
#
# scheduler_job()
# def creating_dag_run():
# from airflow.models import DAG
# from airflow.models import DagRun
#
# dag = DAG(dag_id="test_dag", schedule_interval="@once", start_date=days_ago(2))
# DummyOperator(task_id=f"task", dag=dag)
# dag.sync_to_db()
#
#
# @retry(RETRY)
# def benchmark(dag):
#
# def setup():
# from airflow.utils.session import create_session
# with create_session() as session:
# session.query(DagRun).delete()
#
# @timing()
# @count_queries
# def action():
# # with trace_queries():
# # dag_runs = DagRun.find(dag_id="test_dag", state=State.RUNNING)
# for i in range(1, 1000+1):
# dag.create_dagrun(
# run_id=f"dag-{i}",
# state=State.RUNNING
# )
# # print("dag_runs=", dag_runs)
# # processor.create_dag_run(dag, dag_runs)
# setup()
# action()
# benchmark(dag)
#
#
# creating_dag_run()
# def file_processing():
# from airflow.utils.session import create_session
# from airflow.models import DagBag
# dagbag = DagBag(DAG_FILE, include_examples=False)
# for dag in dagbag.dags.values():
# dag.sync_to_db()
#
# with create_session() as session:
# from airflow.models import DagModel, TaskInstance, DagRun
# (
# session.query(DagModel)
# .filter(DagModel.is_paused.is_(True))
# .update({"is_paused": False}, synchronize_session=False)
# )
# with trace_queries():
# session.query(TaskInstance).delete()
# session.query(DagRun).delete()
#
# session.commit()
#
# # @timing(RETRY)
# # @retry(RETRY)
# @timing()
# @count_queries
# def slow_case():
# # for _ in range(100):
# # conf.getboolean('scheduler', 'USE_JOB_SCHEDULE')
# with trace_queries():
# processor._process_dags(dagbag, dagbag.dags.values())
#
# slow_case()
#
# file_processing()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment