Skip to content

Instantly share code, notes, and snippets.

@kmcquade
Last active September 25, 2019 17:55
Show Gist options
  • Save kmcquade/f43f1b755321f1903c67e0eb0d176675 to your computer and use it in GitHub Desktop.
Save kmcquade/f43f1b755321f1903c67e0eb0d176675 to your computer and use it in GitHub Desktop.
OPA help code
package common
# Supply:
# - resource via resource_changes[i]
# - a key (doesn't matter how many values deep). Key should be after levels resource_changes[i].change.after
# - a desired value.
# Determine if the value of that key matches the desired value.
resource_key_value_matches(resource, key, desired_value) {
# check if key assigned to variable k exists
k := key
# check if path exists and is not false
resource.change.after[k]
# check if value for key is desired_value
desired_value == resource.change.after[k]
}
package common
# ---------------------------------------------------------------------------------------------------------------------
# resource_key_value_matches
# ---------------------------------------------------------------------------------------------------------------------
test_resource_key_value_matches {
s3_bucket_plan := { "format_version": "0.1", "terraform_version": "0.12.2", "planned_values": "", "resource_changes": [ { "address": "aws_s3_bucket.profile_picture_storage", "mode": "managed", "type": "aws_s3_bucket", "name": "profile_picture_storage", "provider_name": "aws", "change": { "actions": [ "create" ], "before": null, "after": { "acl": "private", "bucket_prefix": "profile-picture-storage", "cors_rule": [], "force_destroy": false, "lifecycle_rule": [], "logging": [], "object_lock_configuration": [], "policy": null, "replication_configuration": [], "server_side_encryption_configuration": [ { "rule": [ { "apply_server_side_encryption_by_default": [ { "kms_master_key_id": null, "sse_algorithm": "AES256" } ] } ] } ], "tags": { "ApplicationRole": "FileStorage", "Owner": "UserEngagement", "Project": "ProfileUploadService" }, "versioning": [ { "enabled": true, "mfa_delete": false } ], "website": [] }, "after_unknown": { "acceleration_status": true, "arn": true, "bucket": true, "bucket_domain_name": true, "bucket_regional_domain_name": true, "cors_rule": [], "hosted_zone_id": true, "id": true, "lifecycle_rule": [], "logging": [], "object_lock_configuration": [], "region": true, "replication_configuration": [], "request_payer": true, "server_side_encryption_configuration": [ { "rule": [ { "apply_server_side_encryption_by_default": [ {} ] } ] } ], "tags": {}, "versioning": [ {} ], "website": [], "website_domain": true, "website_endpoint": true } } } ] }
resource_key_value_matches(s3_bucket_plan.resource_changes[0], "server_side_encryption_configuration[0].rule[0].apply_server_side_encryption_by_default[0].sse_algorithm", "AES256")
}

Code I'm using in seeking help from the OPA Slack channel.

Here's the idea.

For the resource_key_value_matches function, I want to allow users to specify the path within a resource, with any number of nested levels deep.

For an example call, see the common_test.rego file.

resource_key_value_matches(s3_bucket_plan.resource_changes[0], "server_side_encryption_configuration[0].rule[0].apply_server_side_encryption_by_default[0].sse_algorithm", "AES256")

The function won't know how many levels deep it should go until the user supplies it.

I didn't expect it to work as-is, but I wanted to show what I am aiming for. Hoping that someone from the slack channel can help with this.

Please let me know if you have any ideas. Thank you!!

Run the unit test

opa test -v .

Test in interactive shell

opa run ./plan-file-for-reference.json

Plan file

You can view the sample plan file for reference. It's the JSON file in this gist.

package play
#
# resource_key_value_matches(resource, key, desired_value) {
# # check if key assigned to variable k exists
# k := key
# # check if path exists and is not false
# resource.change.after[k]
# # check if value for key is desired_value
# desired_value == resource.change.after[k]
# }
#
# test_resource_key_value_matches {
# s3_bucket_plan := resource_key_value_matches(s3_bucket_plan.resource_changes[0], "server_side_encryption_configuration[0].rule[0].apply_server_side_encryption_by_default[0].sse_algorithm", "AES256")
# }
# Matching example
s3_bucket_encryption_aes256 {
# Some s3 bucket change..
resource := s3_bucket_changes[_]
# After the plan applies..
new_resource := resource.change.after
# Must have AES256 encryption by default
"AES256" == new_resource.server_side_encryption_configuration[0].rule[0].apply_server_side_encryption_by_default[0].sse_algorithm
}
# Negation example (probably more useful since it catches invalid buckets)
s3_buckets_without_aes256[name] {
# Any resource that has a type "aws_s3_bucket"
resource := s3_bucket_changes[_]
# After the plan applies..
new_resource := resource.change.after
# Does *not* use AES256
"AES256" != new_resource.server_side_encryption_configuration[0].rule[0].apply_server_side_encryption_by_default[0].sse_algorithm
# Add it to the list of buckets missing the encryption
name := resource.name
}
#### Aggregate/top level rules, probably ones managed centrally and queried
# Makes it really easy to query for allow to get a boolean, or deny and get a list of errors,
# alternative build an object or something to parse.
deny[msg] {
count(s3_buckets_without_aes256) > 0
msg := sprintf("bucket %s has an invalid encryption algorithm", [s3_buckets_without_aes256[_]])
}
allow {
count(deny) > 0
}
#### Helpers - probably put them in separate package
#### and import them as needed
# Probably put this into a shared place for s3 related policies
s3_bucket_changes[r] {
resource := resource_changes[_]
resource.type = "aws_s3_bucket"
r := resource
}
# `resource_changes` builds a list of all resource change objects
# found in the input document. This does not to anything with modules.
# This should be shared/imported with pretty much all the other modules
resource_changes[r] {
r := input.resource_changes[_]
}
{
"format_version": "0.1",
"terraform_version": "0.12.2",
"planned_values": "",
"resource_changes": [
{
"address": "aws_s3_bucket.profile_picture_storage",
"mode": "managed",
"type": "aws_s3_bucket",
"name": "profile_picture_storage",
"provider_name": "aws",
"change": {
"actions": ["create"],
"before": null,
"after": {
"acl": "private",
"bucket_prefix": "profile-picture-storage",
"cors_rule": [],
"force_destroy": false,
"lifecycle_rule": [],
"logging": [],
"object_lock_configuration": [],
"policy": null,
"replication_configuration": [],
"server_side_encryption_configuration": [
{
"rule": [
{
"apply_server_side_encryption_by_default": [
{ "kms_master_key_id": null, "sse_algorithm": "AES256" }
]
}
]
}
],
"tags": {
"ApplicationRole": "FileStorage",
"Owner": "UserEngagement",
"Project": "ProfileUploadService"
},
"versioning": [{ "enabled": true, "mfa_delete": false }],
"website": []
},
"after_unknown": {
"acceleration_status": true,
"arn": true,
"bucket": true,
"bucket_domain_name": true,
"bucket_regional_domain_name": true,
"cors_rule": [],
"hosted_zone_id": true,
"id": true,
"lifecycle_rule": [],
"logging": [],
"object_lock_configuration": [],
"region": true,
"replication_configuration": [],
"request_payer": true,
"server_side_encryption_configuration": [
{ "rule": [{ "apply_server_side_encryption_by_default": [{}] }] }
],
"tags": {},
"versioning": [{}],
"website": [],
"website_domain": true,
"website_endpoint": true
}
}
}
]
}
@kmcquade
Copy link
Author

Response from Patrick East:

For nested resources check out the snippet on open-policy-agent/opa#1772 for an example of a helper that retrieves all the resources for a given plan regardless of depth. From there you can write rules to match the resource type/address/whatever

@kmcquade
Copy link
Author

Patrick East 12:26 PM

hokay, so here is an example showing how someone can write rules based on that type of input document (I moved the resource change json into input, you can always shift that around) https://play.openpolicyagent.org/p/QETiP7epmU

the only part someone like the security architects would need to worry about is the rules that define the like business logic (eg "some resource with X attribute has Y criteria to be valid")

the other stuff you would put into shared modules

In theory if

s3_buckets_without_aes256[name] {
    # Any resource that has a type "aws_s3_bucket"
    resource := s3_bucket_changes[_]
    
    # After the plan applies..
    new_resource := resource.change.after
    
    # Does *not* use AES256
    "AES256" != new_resource.server_side_encryption_configuration[0].rule[0].apply_server_side_encryption_by_default[0].sse_algorithm
    
    # Add it to the list of buckets missing the encryption
    name := resource.name
}

was still too much you might be able to code-gen them from a like more simple document to make that type of rego rule... but what I've seen the TF plans and rules have lots of these simple ones, and an equal amount of more complex ones that require like parsing JSON embedded in fields and other junk
so having access to the rule to make those kind of special case ones is, IMO, helpful

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment