Skip to content

Instantly share code, notes, and snippets.

@elijahr
Created September 28, 2022 09:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save elijahr/7594c732c4b893d35a85511ae31ad5da to your computer and use it in GitHub Desktop.
Save elijahr/7594c732c4b893d35a85511ae31ad5da to your computer and use it in GitHub Desktop.
Script for determining if the currently running CircleCI node is the last running node
#!/usr/bin/env python3
"""Script for determining if the currently running CircleCI node is the last
running node.
Useful for when some action should be performed after parallel jobs have completed with errors.
A `CIRCLE_TOKEN` environment variable must be present.
The API token can be created at https://app.circleci.com/settings/user/tokens
and placed in an environment variable in your Organization Settings under "Contexts".
Can be used like:
- run:
name: Final node step
when: always # even run if preceding step has error
command: |
set -euo pipefail
if ./bin/is-final-node.py; then
echo "This is the final running node; all other parallel jobs have completed or failed."
fi
This is a workaround for CircleCI workflow jobs not allowing fan-in after any errors have occurred in fan-out.
This is one of the most popular feature requests for CircleCI but there is no solution thus far.
Feature request: https://circleci.canny.io/cloud-feature-requests/p/option-to-allow-failures-in-fan-inout-workflow
"""
import http.client
import json
import os
import sys
current_node = int(os.environ["CIRCLE_NODE_INDEX"])
circle_token = os.environ["CIRCLE_TOKEN"]
job_id = os.environ["CIRCLE_BUILD_NUM"]
connection = http.client.HTTPSConnection("circleci.com")
path = "/api/v2/project/gh/styleseat/styleseat/job/" + job_id
headers = {
"Content-type": "application/json",
"Accept": "application/json",
"Circle-Token": circle_token,
}
connection.request("GET", path, headers=headers)
response = connection.getresponse()
content = response.read().decode()
assert response.status == 200, content
runs = json.loads(content)["parallel_runs"]
pending_nodes = [r["index"] for r in runs if r["index"] != current_node and r["status"] != "success"]
if pending_nodes:
node_csv = ", ".join(map(str, pending_nodes))
print(f"remaining nodes: {node_csv}", file=sys.stderr)
sys.exit(1)
else:
print(f"final node: YES", file=sys.stderr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment