Skip to content

Instantly share code, notes, and snippets.

View api.json
{ "data": { "project": { "id": "gid://gitlab/Project/", "terraformStates": { "count": 1, "nodes": [ { "id": "gid://gitlab/Terraform::State/", "name": "", "lockedAt": null, "updatedAt": "2022-08-02T19:55:26Z", "deletedAt": null, "lockedByUser": null, "latestVersion": { "id": "gid://gitlab/Terraform::StateVersion/", "downloadPath": "/api/v4/projects//terraform/state//versions/0", "serial": 0, "updatedAt": "2022-08-02T19:55:26Z", "createdByUser": null, "job": null, "__typename": "TerraformStateVersion" }, "__typename": "TerraformState" } ], "pageInfo": { "hasNextPage": false, "hasPreviousPage": false, "startCursor": "", "endCursor": "", "__typename": "PageInfo" }, "__typename": "TerraformStateConnection" },
View post.json
POST https://{{GITLAB-HOST}}/api/graphql { "operationName": "getStates", "variables": { "projectPath": "sefi/tf-demo", "first": 50, "after": null, "last": null, "before": null }, "query": "query getStates($projectPath: ID!, $first: Int, $last: Int, $before: String, $after: String) {\n project(fullPath: $projectPath) {\n id\n terraformStates(first: $first, last: $last, before: $before, after: $after) {\n count\n nodes {\n ...State\n __typename\n }\n pageInfo {\n ...PageInfo\n __typename\n }\n __typename\n }\n __typename\n }\n}\n\nfragment State on TerraformState {\n id\n name\n lockedAt\n updatedAt\n deletedAt\n lockedByUser {\n ...User\n __typename\n }\n latestVersion {\n ...StateVersion\n __typename\n }\n __typename\n}\n\nfragment User on User {\n id\n avatarUrl\n name\n username\n webUrl\n __typename\n}\n\nfragment StateVersion on TerraformStateVersion {\n id\n downloadPath\n serial\n updatedAt\n createdByUser {\n ...User\n __typename\n }\n job {\n id\n detailedStatus {\n id\n detailsPath\n group\n icon\n l
View error_handler.py
def _handle_event(event_detail_type: str, event_detail: dict) -> None:
try:
handle_task(event_detail_type, event)
except ExecutionSyncError: # Raised if this is a "task_completed" event, and "task_started" hasn't arrive yet
if event.retry_count < MAX_RETRY_COUNT:
logger.info('Events are not in sync, sending this event to the retry queue '
f'(retry_count={event.retry_count})')
event.retry_count += 1
message = {
DETAIL_TYPE: event_detail_type,
View retry.py
MAX_RETRY_COUNT = 7
DELAY_SECONDS = 5
@exception_handler
def handler(event, context):
if 'Records' in event: # SQS
records = event.get('Records', [])
logger.info(f'Received events from SQS with {len(records)} records')
View test.py
def test_task_completed_arrived_before_task_started():
# First of all, send a task_completed event
_send_task_completed_event()
# Now wait a bit, and send a task_started event
time.sleep(3)
_send_task_started_event()
completed = False
with time_limit(MAX_RETRY_COUNT * 10): # Limit the time the test can run
View context-manager.py
@contextlib.contextmanager
def time_limit(seconds):
def raise_timeout(signum, frame):
raise TimeoutError('Timed out!')
signal.signal(signal.SIGALRM, raise_timeout)
signal.alarm(seconds)
yield
View context-var.py
"""
data_layer.py - how to use the credentials
"""
from decorators import get_dynamodb_session_keys
def some_data_access_method():
session = boto3.Session(**get_dynamodb_session_keys())
dynamodb = session.resource("dynamodb")
table = dynamodb.Table(TABLE_NAME)
# now we can use 'table' to access with tenant_id restrictions
View decorator.py
"""
decorators.py - creating the credentials
"""
import functools
from contextvars import ContextVar, copy_context
dynamodb_session_keys = ContextVar("dynamodb_session_keys", default=None)
def dynamodb_tenant_isolation(func):
@functools.wraps(func)
View import.py
import boto3
def generate_credentials(event):
tenant_id = extract_tenant_from_auth_header(event)
dynamic_policy = generate_dynamodb_policy(tenant_id)
sts_client = boto3.client("sts")
assumed_role = sts_client.assume_role(
RoleArn="arn:aws:iam::<account-id>:role/DynamodbRoleToAssume",
RoleSessionName="<name-to-identify-the-assumed-role-session>",
Policy=dynamic_policy,
View policy.py
def generate_dynamodb_policy(tenant_id):
return {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:Query"],
"Resource": [
f"arn:aws:dynamodb:<region>:<account-id>:table/TableName",
f"arn:aws:dynamodb:<region>:<account-id>:table/TableName/index/*",
],