-
-
Save mik-laj/2f41e3e66b4331b8d79d90912e4e7c81 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import logging | |
import time | |
import sys | |
from airflow.configuration import conf | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.utils.dates import days_ago | |
from airflow.utils.state import State | |
sys.path.pop(0) | |
sys.path.insert(0, '') | |
print(sys.path) | |
# | |
# | |
# class CountQueries(object): | |
# def __init__(self): | |
# self.count = 0 | |
# | |
# def __enter__(self): | |
# from sqlalchemy import event | |
# from airflow.settings import engine | |
# event.listen(engine, "after_cursor_execute", self.after_cursor_execute) | |
# return None | |
# | |
# def after_cursor_execute(self, *args, **kwargs): | |
# self.count += 1 | |
# | |
# def __exit__(self, type, value, traceback): | |
# from sqlalchemy import event | |
# from airflow.settings import engine | |
# event.remove(engine, "after_cursor_execute", self.after_cursor_execute) | |
# print('Query count: ', self.count) | |
# | |
# | |
# count_queries = CountQueries | |
import contextlib | |
import traceback | |
@contextlib.contextmanager | |
def trace_queries(): | |
from sqlalchemy import event | |
from airflow.settings import engine | |
log = logging.getLogger("sql") | |
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): | |
conn.info.setdefault('query_start_time', []).append(time.time()) | |
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): | |
total = time.time() - conn.info['query_start_time'].pop() | |
file_names = [ | |
f"{f.filename}:{f.name}:{f.lineno}" | |
for f in traceback.extract_stack() | |
if 'sqlalchemy' not in f.filename | |
] | |
file_name = file_names[-1] if file_names else "" | |
stack = [ | |
f | |
for f in traceback.extract_stack() | |
if 'sqlalchemy' not in f.filename | |
] | |
stack_info = " > ".join([f"{f.filename.rpartition('/')[-1]}:{f.name}:{f.lineno}" for f in stack][-7:]) | |
conn.info.setdefault('query_start_time', []).append(time.monotonic()) | |
log.info( | |
"%.5f | %s | %s | %s", | |
total, file_name, stack_info, | |
"" | |
# statement.replace("\n", " ") | |
) | |
event.listen(engine, "before_cursor_execute", before_cursor_execute) | |
event.listen(engine, "after_cursor_execute", after_cursor_execute) | |
try: | |
yield | |
finally: | |
event.remove(engine, "before_cursor_execute", before_cursor_execute) | |
event.remove(engine, "after_cursor_execute", after_cursor_execute) | |
DAG_FILE = "/files/dags/50_dag_5_dummy_tasks.py" | |
log = logging.getLogger("airflow.processor") | |
from airflow.jobs.scheduler_job import DagFileProcessor, SchedulerJob | |
processor = DagFileProcessor([], log) | |
@contextlib.contextmanager | |
def pyspy(): | |
import os | |
import datetime | |
import signal | |
pid = str(os.getpid()) | |
suffix = datetime.datetime.now().isoformat() | |
filename = f'/files/pyspy/flame-{suffix}-{pid}.html' | |
pyspy_pid = os.spawnlp( | |
os.P_NOWAIT, 'sudo', 'sudo', 'py-spy', 'record', '--idle', '-o', filename, '-p', pid | |
) | |
try: | |
yield | |
finally: | |
os.kill(pyspy_pid, signal.SIGINT) | |
def benchmark(retry_count=5): | |
def benchmark_dec(f): | |
@functools.wraps(f) | |
def wrap(*args, **kwargs): | |
r = [] | |
for i in range(retry_count): | |
time1 = time.time() | |
f(*args, **kwargs) | |
time2 = time.time() | |
diff = (time2 - time1) * 1000.0 | |
r.append(diff) | |
# print('Retry %d took %0.3f ms' % (i, diff)) | |
print('Average took %0.3f ms' % (sum(r) / retry_count)) | |
return wrap | |
return benchmark_dec | |
def retry(retry_count=5): | |
def retry_dec(f): | |
@functools.wraps(f) | |
def wrap(*args, **kwargs): | |
for i in range(retry_count): | |
f(*args, **kwargs) | |
return wrap | |
return retry_dec | |
def timing(retry_count=1): | |
def timing_dec(fn): | |
@functools.wraps(fn) | |
def wrap(*args, **kwargs): | |
r = [] | |
time1 = time.time() | |
fn(*args, **kwargs) | |
time2 = time.time() | |
diff = (time2 - time1) * 1000.0 | |
r.append(diff) | |
print('Retry took %0.3f ms' % (diff / retry_count)) | |
return wrap | |
return timing_dec | |
def count_queries(fn): | |
@functools.wraps(fn) | |
def wrap(*args, **kwargs): | |
fn.query_count = 0 | |
def after_cursor_execute(*args, **kwargs): | |
fn.query_count += 1 | |
from sqlalchemy import event | |
from airflow.settings import engine | |
event.listen(engine, "after_cursor_execute", after_cursor_execute) | |
try: | |
return fn(*args, **kwargs) | |
finally: | |
print("Query count: ", fn.query_count) | |
event.remove(engine, "after_cursor_execute", after_cursor_execute) | |
return wrap | |
@contextlib.contextmanager | |
def profiled(print_callers=False): | |
import cProfile | |
import io | |
import pstats | |
pr = cProfile.Profile() | |
pr.enable() | |
yield | |
pr.disable() | |
s = io.StringIO() | |
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative') | |
if print_callers: | |
ps.print_callers() | |
else: | |
ps.print_stats() | |
print(s.getvalue()) | |
RETRY = 10 | |
def file_processing(): | |
from airflow.utils.session import create_session | |
with create_session() as session: | |
from airflow.models import DagModel | |
( | |
session.query(DagModel) | |
.filter(DagModel.is_paused.is_(True)) | |
.update({"is_paused": False}, synchronize_session=False) | |
) | |
session.commit() | |
from airflow.models import TaskInstance | |
session.query(TaskInstance).delete() | |
from airflow.models import DagRun | |
session.query(DagRun).delete() | |
@timing(RETRY) | |
@retry(RETRY) | |
@timing() | |
@count_queries | |
def slow_case(): | |
# for _ in range(100): | |
# conf.getboolean('scheduler', 'USE_JOB_SCHEDULE') | |
# with trace_queries(): | |
# with pyspy(): | |
from airflow.models import DagRun | |
session.query(DagRun).delete() | |
from airflow.models import TaskInstance | |
session.query(TaskInstance).delete() | |
session.commit() | |
# with trace_queries(): | |
processor.process_file(DAG_FILE, None, pickle_dags=False) | |
slow_case() | |
file_processing() | |
# def scheduler_job(): | |
# from airflow.utils.session import create_session | |
# with create_session() as session: | |
# from airflow.models import DagModel | |
# ( | |
# session.query(DagModel) | |
# .filter(DagModel.is_paused.is_(True)) | |
# .update({"is_paused": False}, synchronize_session=False) | |
# ) | |
# session.commit() | |
# from airflow.models import TaskInstance | |
# session.query(TaskInstance).delete() | |
# from airflow.models import DagRun | |
# session.query(DagRun).delete() | |
# from airflow.models import DagTag | |
# session.query(DagTag).delete() | |
# from airflow.models import DagModel | |
# session.query(DagModel).delete() | |
# from airflow.utils.db import add_default_pool_if_not_exists | |
# add_default_pool_if_not_exists() | |
# session.commit() | |
# | |
# # @timing(RETRY) | |
# # @retry(RETRY) | |
# @timing() | |
# # @count_queries | |
# def slow_case(): | |
# print("Start Scheduler") | |
# from airflow.models import DagRun | |
# session.query(DagRun).delete() | |
# from airflow.models import TaskInstance | |
# session.query(TaskInstance).delete() | |
# from tests.test_utils.mock_executor import MockExecutor | |
# | |
# executor = MockExecutor() | |
# # with trace_queries(): | |
# SchedulerJob(subdir=DAG_FILE, do_pickle=False, num_runs=3, executor=executor).run() | |
# | |
# slow_case() | |
# | |
# scheduler_job() | |
# def creating_dag_run(): | |
# from airflow.models import DAG | |
# from airflow.models import DagRun | |
# | |
# dag = DAG(dag_id="test_dag", schedule_interval="@once", start_date=days_ago(2)) | |
# DummyOperator(task_id=f"task", dag=dag) | |
# dag.sync_to_db() | |
# | |
# | |
# @retry(RETRY) | |
# def benchmark(dag): | |
# | |
# def setup(): | |
# from airflow.utils.session import create_session | |
# with create_session() as session: | |
# session.query(DagRun).delete() | |
# | |
# @timing() | |
# @count_queries | |
# def action(): | |
# # with trace_queries(): | |
# # dag_runs = DagRun.find(dag_id="test_dag", state=State.RUNNING) | |
# for i in range(1, 1000+1): | |
# dag.create_dagrun( | |
# run_id=f"dag-{i}", | |
# state=State.RUNNING | |
# ) | |
# # print("dag_runs=", dag_runs) | |
# # processor.create_dag_run(dag, dag_runs) | |
# setup() | |
# action() | |
# benchmark(dag) | |
# | |
# | |
# creating_dag_run() | |
# def file_processing(): | |
# from airflow.utils.session import create_session | |
# from airflow.models import DagBag | |
# dagbag = DagBag(DAG_FILE, include_examples=False) | |
# for dag in dagbag.dags.values(): | |
# dag.sync_to_db() | |
# | |
# with create_session() as session: | |
# from airflow.models import DagModel, TaskInstance, DagRun | |
# ( | |
# session.query(DagModel) | |
# .filter(DagModel.is_paused.is_(True)) | |
# .update({"is_paused": False}, synchronize_session=False) | |
# ) | |
# with trace_queries(): | |
# session.query(TaskInstance).delete() | |
# session.query(DagRun).delete() | |
# | |
# session.commit() | |
# | |
# # @timing(RETRY) | |
# # @retry(RETRY) | |
# @timing() | |
# @count_queries | |
# def slow_case(): | |
# # for _ in range(100): | |
# # conf.getboolean('scheduler', 'USE_JOB_SCHEDULE') | |
# with trace_queries(): | |
# processor._process_dags(dagbag, dagbag.dags.values()) | |
# | |
# slow_case() | |
# | |
# file_processing() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment