Skip to content

Instantly share code, notes, and snippets.

@brownhash
Created December 31, 2020 14:30
Show Gist options
  • Save brownhash/6f37d46d110a68314ef8773e18a6786f to your computer and use it in GitHub Desktop.
Save brownhash/6f37d46d110a68314ef8773e18a6786f to your computer and use it in GitHub Desktop.
Fetch task logs from airflow for a specific task of a specific dag
# NOTE: This works only when the executor is CeleryExecutor
import os
import requests
import json
import urllib.parse
class AirflowLog(object):
def __init__(self, airflow_addr, dag_id, task_id, execution_date, try_number='1'):
self.airflow_addr = airflow_addr
self.dag_id = dag_id
self.task_id = task_id
self.execution_date = urllib.parse.quote(execution_date)
self.try_number = try_number
def get_logs(self, log_format='file'):
if log_format != 'file' and log_format != 'json':
raise Exception('invalid log format: {}, use file|json'.format(log_format))
url = os.path.join(
self.airflow_addr,
'admin/airflow/get_logs_with_metadata?dag_id={}&task_id={}&execution_date={}&metadata=null&format={}&try_number={}'.format(
self.dag_id,
self.task_id,
self.execution_date,
log_format,
self.try_number
)
)
resp = requests.get(url)
if log_format == 'json':
return json.loads(resp.text)
return resp.text
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment