Last active
July 23, 2022 01:10
-
-
Save Nogbit/b4cec1a3bef002f4422030f0b063d1b3 to your computer and use it in GitHub Desktop.
SCC Article Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import uuid | |
import functions_framework | |
import google.cloud.logging | |
from google.cloud import securitycenter | |
from google.cloud.securitycenter_v1 import Finding | |
logging_client = google.cloud.logging.Client() | |
logging_client.setup_logging() | |
import logging | |
def creation_date(asset): | |
time = asset.asset.resource_properties.get('creationTimestamp') | |
dt_aware = datetime.datetime.fromisoformat(time) | |
return dt_aware.isoformat('T') | |
def is_ready(asset): | |
return asset.asset.resource_properties.get('status') == 'READY' | |
def parse_request(request): | |
data = {} | |
try: | |
request_json = request.get_json(silent=True) | |
except: | |
data['error'] = {'message': 'Unknown content type, expected JSON', 'code': 415} | |
if request_json and 'org_id' in request_json: | |
data['org_name'] = f'organizations/{request_json["org_id"]}' | |
else: | |
data['error'] = {'message': 'JSON is missing "org_id" property', 'code': 422} | |
if request_json and 'scc_source_name' in request_json: | |
data['scc_source_name'] = request_json['scc_source_name'] | |
else: | |
data['error'] = {'message': 'JSON is missing "source_name" property', 'code': 422} | |
if request_json and 'image_family' in request_json: | |
data['image_family'] = request_json['image_family'] | |
else: | |
data['error'] = {'message': 'JSON is missing "image_family" property', 'code': 422} | |
return data | |
@functions_framework.http | |
def scan(request): | |
data = parse_request(request) | |
if 'error' in data: | |
return data['message'], data['code'] | |
client = securitycenter.SecurityCenterClient() | |
image_filter = f'security_center_properties.resource_type = "google.compute.Image" AND resource_properties.family = "{data["image_family"]}"' | |
asset_iterator = client.list_assets( | |
request={ | |
'parent': data['org_name'], | |
'filter': image_filter | |
} | |
) | |
newest_image_asset = None | |
old_images = [] | |
# Which is the newest and create a list of all the old ones | |
for image_asset in asset_iterator: | |
if not newest_image_asset and is_ready(image_asset): | |
newest_image_asset = image_asset | |
else: | |
if creation_date(image_asset) > creation_date(newest_image_asset) and is_ready(image_asset): | |
old_images.append(newest_image_asset.asset.resource_properties.get('selfLink')) | |
newest_image_asset = image_asset | |
if not newest_image_asset: | |
message = f'No images to scan for family {data["image_family"]}' | |
logging.info(message) | |
return message, 200 | |
else: | |
# Find all the disk assets, image assets doen't have a "users" property, | |
# and disks assets do not permit query by image family | |
disk_filter = f'security_center_properties.resource_type = "google.compute.Disk"' | |
asset_iterator = client.list_assets( | |
request={ | |
'parent': data['org_name'], | |
'filter': disk_filter | |
} | |
) | |
response = [] | |
for disk_asset in asset_iterator: | |
if disk_asset.asset.resource_properties.get('sourceImage') in old_images: | |
disk_name = disk_asset.asset.security_center_properties.resource_name | |
logging.info(f'Found {disk_name} using a dated golden image, creating security finding.') | |
finding_id = uuid.uuid4().hex | |
source_name = data['scc_source_name'] | |
finding = Finding( | |
state = Finding.State.ACTIVE, | |
resource_name = disk_name, | |
category = 'USING_DATED_GOLDEN_IMAGE', | |
event_time = datetime.datetime.now(tz=datetime.timezone.utc), | |
) | |
created_finding = client.create_finding( | |
parent=source_name, | |
finding_id=finding_id, | |
finding=finding | |
) | |
message = f'New security finding created of {created_finding.name}' | |
logging.info(message) | |
response.append(message) | |
return '. '.join(response), 201 if len(response) > 0 else 200 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment