Skip to content

Instantly share code, notes, and snippets.

@lukeorland
Created March 30, 2023 19:39
Show Gist options
  • Save lukeorland/cf21724b35f406417e51e8869e41696c to your computer and use it in GitHub Desktop.
Save lukeorland/cf21724b35f406417e51e8869e41696c to your computer and use it in GitHub Desktop.
Prefect Cloud 1.0 - fetch logs for flow run
"""
Retrieve all the logs for a flow run.
E.g.
FLOW_RUN_ID=adff99c1-7dcf-40c1-bd1d-1c026ad930cb \
python \
notes-synced/snippets/fetch_flow_run_logs.py \
print_concatenated_log_messages \
$FLOW_RUN_ID \
> logs-flow-run-id-$FLOW_RUN_ID.txt
"""
from typing import List
import fire
import prefect
LIMIT = 200
CLIENT = prefect.client.Client()
def fetch_some_logs(flow_run_id: str, limit: int, offset: int) -> List[dict]:
response = CLIENT.graphql(
query="""
query {
log (
where: {
flow_run_id: {_eq: "%s"}
}
limit: %s
offset: %s
) {
created
flow_run_id
id
info
is_audit_log
is_loaded_from_archive
level
message
name
object_id
object_table
task_run_id
}
}"""
% (flow_run_id, limit, offset),
raise_on_error=True,
)
result = response["data"]["log"]
return result
def fetch_all_logs(flow_run_id: str) -> List[dict]:
logs = []
offset = 0
try:
while True:
next_logs = fetch_some_logs(flow_run_id, LIMIT, offset)
if not next_logs:
# The list of logs is empty when the offset is past the last
# log.
break
logs.extend(next_logs)
offset += LIMIT
except prefect.exceptions.ClientError:
pass
return logs
def concatenate_log_messages(log_messages: List[dict]) -> str:
log_messages_concatenated = "\n".join(
[log["message"] for log in log_messages],
)
return log_messages_concatenated
def print_concatenated_log_messages(flow_run_id):
log_messages = fetch_all_logs(flow_run_id)
print(concatenate_log_messages(log_messages))
if __name__ == "__main__":
fire.Fire()
@lukeorland
Copy link
Author

  • You'll need an API key to auth with Prefect Cloud
  • This script uses the fire Python package.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment