Skip to content

Instantly share code, notes, and snippets.

@alxthm
Last active August 12, 2022 20:34
Show Gist options
  • Save alxthm/b62eceb1e47f0320e5bd82a67f33e042 to your computer and use it in GitHub Desktop.
Save alxthm/b62eceb1e47f0320e5bd82a67f33e042 to your computer and use it in GitHub Desktop.
Python script to transform meltano schedules from a json format into a crontab file
import json
import logging
import os
import sys
from pathlib import Path
from typing import List
"""
Transform meltano schedules from a json format into a crontab file.
Example:
$ meltano schedule list --format=json | python make_crontab.py
If you want to specify environment variables (such as CRONTAB_MELTANO_EXECUTABLE),
first load your `.env` file :
$ export $(grep -v '^#' .env | xargs)
"""
PROJECT_HOME = Path(__file__).parents[1].absolute()
# The meltano executable that you want to use in the crontab.
# Note: crontab does not use the $PATH that you have in your regular shell.
#
# Examples:
# - 'meltano' is fine if it is on the '/usr/local/bin:/usr/bin:/bin' path (it is probably not)
# - '/home/my_user/.local/bin/meltano' if you installed meltano with pipx
# - '/usr/local/bin/docker compose exec meltano-ui meltano' if you are using docker
MELTANO_EXECUTABLE = os.environ.get("CRONTAB_MELTANO_EXECUTABLE", "meltano")
def read_schedules_from_stdin():
"""
Read stdin, assuming the meltano schedules are in a json format.
:return:
"""
sys_input = sys.stdin.read()
meltano_schedules = json.loads(sys_input).get("schedules", {})
job_schedules = meltano_schedules.get("job", [])
elt_schedules = meltano_schedules.get("elt", [])
return job_schedules, elt_schedules
def get_crontab_entries(job_schedules: List[dict], elt_schedules: List[dict]):
entries_by_interval = {
"@hourly": [],
"@daily": [],
"@weekly": [],
"@monthly": [],
"@yearly": [],
}
# These crontab intervals slightly differ from the meltano ones, so that cron jobs
# are not executed at the same time
entry_intervals = {
"@hourly": "30 * * * *", # every hour at minute 30
"@daily": "0 0 * * *", # every day at 00:00
"@weekly": "0 1 * * 0", # every week on sunday at 01:00
"@monthly": "0 2 1 * *", # every first day of the month at 02:00
"@yearly": "0 3 1 1 *", # every january 1st at 03:00
}
# Convert elt and job schedules into proper commands
for schedule in job_schedules:
interval = schedule["interval"]
job_name = schedule["job"]["name"]
if interval in entries_by_interval:
entries_by_interval[interval].append(
f"{MELTANO_EXECUTABLE} run {job_name}"
)
else:
logging.info(
f"Schedule for job {job_name} with interval {interval} "
f"was not added to the crontab entries."
)
for schedule in elt_schedules:
interval = schedule["interval"]
if interval in entries_by_interval:
elt_args = " ".join(schedule["elt_args"])
entries_by_interval[interval].append(
f"{MELTANO_EXECUTABLE} elt {elt_args}"
)
else:
logging.info(
f"ELT schedule {schedule['name']} with interval {interval} "
f"was not added to the crontab entries."
)
# Gather commands together by interval, and run them one after the other
crontab_entries = {}
for interval, commands in entries_by_interval.items():
if len(commands) > 0:
cron_interval = entry_intervals[interval]
chained_commands = " && ".join(commands)
crontab_entries[interval] = (
f"{cron_interval} (cd {PROJECT_HOME} && {chained_commands}) "
f"2>&1 | /usr/bin/logger -t meltano"
)
return crontab_entries
def make_crontab_file(crontab_entries: dict):
crontab_template = """
# .---------------- minute (0 - 59)
# | .------------- hour (0 - 23)
# | | .---------- day of month (1 - 31)
# | | | .------- month (1 - 12) OR jan,feb,mar,apr ...
# | | | | .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat
# | | | | |
# * * * * * [username] command to be executed
SHELL=/bin/bash
PATH=/usr/local/bin:/usr/bin:/bin
PROJECT_HOME={project_home}
# Schedules are grouped by interval, and the output (both stdout and stderr) of
# the processes are redirected to the logger unix utility
{entries}
"""
formatted_entries = "\n".join(
f"""
# {interval}
{entry}
"""
for interval, entry in crontab_entries.items()
)
return crontab_template.format(project_home=PROJECT_HOME, entries=formatted_entries)
def main():
job_schedules, elt_schedules = read_schedules_from_stdin()
crontab_entries = get_crontab_entries(job_schedules, elt_schedules)
crontab_full_content = make_crontab_file(crontab_entries)
print(crontab_full_content)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment