Skip to content

Instantly share code, notes, and snippets.

@mathewmoon
Created November 8, 2021 19:05
Show Gist options
  • Save mathewmoon/b9d66c10c7d02a5576d41c1e92c9b273 to your computer and use it in GitHub Desktop.
Save mathewmoon/b9d66c10c7d02a5576d41c1e92c9b273 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from json import dumps
from logging import (
basicConfig as loggerConfig,
getLogger
)
from os import environ
from re import (
compile as re_compile,
sub
)
from sys import stdout
import backoff
from botocore.exceptions import ClientError
from boto3 import client
import kopf
STS = client("sts")
SQS = client("sqs")
ACCOUNT = STS.get_caller_identity()["Account"]
CLUSTER_NAME = environ.get("CLUSTER_NAME")
DEFAULT_ROLE_PREFIX = f"arn:aws:iam::{ACCOUNT}:role/aws-reserved/sso.amazonaws.com"
APPLICATION_ROLE_PREFIX = f"arn:aws:iam::{ACCOUNT}:role/k8s/{CLUSTER_NAME}"
DEFAULT_KMS_KEY = "alias/aws/sqs"
USER_ROLE_PREFIX = environ.get("ROLE_PREFIX", DEFAULT_ROLE_PREFIX)
NAME_SUB_REGEX = re_compile("[^ a-zA-Z0-9_-]")
getLogger().setLevel(environ.get("LOG_LEVEL", "INFO"))
loggerConfig(stream=stdout)
@kopf.on.update("sqsqueue")
@kopf.on.create("sqsqueue")
def create_update_handler(spec, **kwargs):
queue_name = get_queue_name(spec, kwargs["namespace"], kwargs["name"])
if queue_url := get_queue_url(queue_name):
update_queue(spec, queue_url)
else:
create_queue(spec, queue_name, kwargs["namespace"])
@kopf.on.delete("sqsqueue")
def delete_handler(spec, **kwargs):
queue_name = get_queue_name(spec, kwargs["namespace"], kwargs["name"])
if queue_url := get_queue_url(queue_name):
try:
SQS.delete_queue(QueueUrl=queue_url)
except (Exception, ClientError) as e:
getLogger().exception(e)
@backoff.on_exception(backoff.expo, SQS.exceptions.QueueDeletedRecently, max_time=60)
def create_queue(spec, queue_name, namespace):
try:
SQS.create_queue(
QueueName=queue_name,
Attributes=format_attributes(spec),
tags={
"k8s.namespace": namespace,
"k8s.cluster": CLUSTER_NAME
}
)
except SQS.exceptions.QueueDeletedRecently as e:
getLogger().warn(f"Queue {queue_name} was deleted recently. Backing off")
raise e
except SQS.exceptions.QueueNameExists:
queue_url = get_queue_url(queue_name)
update_queue(spec, queue_url)
except ClientError as e:
getLogger().error(e)
except Exception as e:
getLogger().exception(e)
def get_queue_name(spec, namespace, name):
fifo = spec["queueAttributes"]["FifoQueue"]
queue_prefix = sub(NAME_SUB_REGEX, "_", f"k8s-{CLUSTER_NAME}-{namespace}")
if fifo:
queue_name = f"{queue_prefix}_{name}.fifo"
else:
queue_name = f"{queue_prefix}_{name}"
return queue_name
def format_attributes(spec):
attributes = {
k: dumps(v) for k, v in spec["queueAttributes"].items()
if v is not None and k != "permissions"
}
policy = get_queue_policy(
spec["permissions"],
spec["owner"]["principal"],
spec["owner"]["principalType"]
)
attributes["Policy"] = dumps(policy)
attributes["KmsMasterKeyId"] = attributes.get("KmsMasterKeyId", DEFAULT_KMS_KEY)
return attributes
def update_queue(spec, queue_url):
try:
SQS.set_queue_attributes(
QueueUrl=queue_url,
Attributes=format_attributes(spec)
)
except ClientError as e:
getLogger().error(e)
except Exception as e:
getLogger().exception(e)
def get_queue_policy(permissions, owner, owner_type):
receive_users = [
f"*:{x['principal']}" for x in permissions
if x["principalType"] == "user"
and x["permission"] == "receive"
]
receive_roles = [
x["principal"] for x in permissions
if x["principalType"] == "role"
and x["permission"] == "receive"
]
receive_applications = [
f"{APPLICATION_ROLE_PREFIX}/{x['principal']}" for x in permissions
if x["principalType"] == "application"
and x["permission"] == "receive"
]
receive_accounts = [
f"aws:iam::{x['principal']}:root" for x in permissions
if x["principalType"] == "awsAccount"
and x["permission"] == "receive"
]
send_users = [
f"*:{x['principal']}" for x in permissions
if x["principalType"] == "user"
and x["permission"] == "send"
]
send_roles = [
x["principal"] for x in permissions
if x["principalType"] == "role"
and x["permission"] == "send"
]
send_applications = [
f"{APPLICATION_ROLE_PREFIX}/{x['principal']}" for x in permissions
if x["principalType"] == "application"
and x["permission"] == "send"
]
send_accounts = [
f"aws:iam::{x['principal']}:root" for x in permissions
if x["principalType"] == "awsAccount"
and x["permission"] == "send"
]
owner_policy = {
"Sid": "owner_statement",
"Effect": "Allow",
"Principal": {
"AWS": f"arn:aws:iam::{ACCOUNT}:root"
},
"Action": "SQS:*",
"Resource": "*"
}
send_actions = [
"sqs:GetQueueUrl",
"sqs:SendMessage",
"sqs:GetQueueAttributes",
]
receive_actions = [
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
]
k8s_owner_actions = list(set([
*send_actions,
*receive_actions,
"sqs:PurgeQueue"
]))
user_send_policy = {
"Sid": "SendMessagesViaUser",
"Effect": "Allow",
"Principal": "*",
"Action": send_actions,
"Resource": "*",
"Condition": {
"StringLike": {
"aws:userid": send_users,
"aws:PrincipalArn": f"{USER_ROLE_PREFIX}/*"
}
}
}
user_receive_policy = {
"Sid": "ReceiveMessagesViaUser",
"Effect": "Allow",
"Principal": "*",
"Action": receive_actions,
"Resource": "*",
"Condition": {
"StringLike": {
"aws:userid": receive_users,
"aws:PrincipalArn": f"{USER_ROLE_PREFIX}/*"
}
}
}
role_send_policy = {
"Sid": "SendMessagesViaRole",
"Effect": "Allow",
"Principal": {
"AWS": [
*send_roles,
*send_applications,
*send_accounts
]
},
"Action": send_actions,
"Resource": "*"
}
role_receive_policy = {
"Sid": "ReceiveMessagesViaRole",
"Effect": "Allow",
"Principal": {
"AWS": [
*receive_roles,
*receive_applications,
*receive_accounts
]
},
"Action": receive_actions,
"Resource": "*"
}
k8s_owner_policy = {
"Sid": "QueueOwnerFullControl",
"Effect": "Allow",
"Principal": {},
"Action": k8s_owner_actions,
"Resource": "*"
}
if owner_type == "application":
k8s_owner_policy["Principal"]["AWS"] = f"{APPLICATION_ROLE_PREFIX}/{owner}"
elif owner_type == "user":
k8s_owner_policy["Principal"]["AWS"] = "*"
k8s_owner_policy["Condition"] = {
"StringLike": {
"aws:userid": f"*:{owner}",
"aws:PrincipalArn": f"{USER_ROLE_PREFIX}/*"
}
}
else:
k8s_owner_policy["Principal"]["AWS"] = owner
statements = []
if send_users:
statements.append(user_send_policy)
if receive_users:
statements.append(user_receive_policy)
if [*receive_accounts, *receive_roles, *receive_applications]:
statements.append(role_receive_policy)
if [*send_accounts, *send_roles, *send_applications]:
statements.append(role_send_policy)
statements.append(owner_policy)
statements.append(k8s_owner_policy)
full_policy = {
"Version": "2008-10-17",
"Statement": statements
}
return full_policy
def get_queue_url(queue_name):
try:
res = SQS.get_queue_url(QueueName=queue_name)
queue_url = res["QueueUrl"]
except SQS.exceptions.QueueDoesNotExist:
queue_url = None
return queue_url
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment