Skip to content

Instantly share code, notes, and snippets.

@Nogbit
Last active July 23, 2022 01:10
Show Gist options
  • Save Nogbit/b4cec1a3bef002f4422030f0b063d1b3 to your computer and use it in GitHub Desktop.
Save Nogbit/b4cec1a3bef002f4422030f0b063d1b3 to your computer and use it in GitHub Desktop.
SCC Article Python
import datetime
import uuid
import functions_framework
import google.cloud.logging
from google.cloud import securitycenter
from google.cloud.securitycenter_v1 import Finding
logging_client = google.cloud.logging.Client()
logging_client.setup_logging()
import logging
def creation_date(asset):
time = asset.asset.resource_properties.get('creationTimestamp')
dt_aware = datetime.datetime.fromisoformat(time)
return dt_aware.isoformat('T')
def is_ready(asset):
return asset.asset.resource_properties.get('status') == 'READY'
def parse_request(request):
data = {}
try:
request_json = request.get_json(silent=True)
except:
data['error'] = {'message': 'Unknown content type, expected JSON', 'code': 415}
if request_json and 'org_id' in request_json:
data['org_name'] = f'organizations/{request_json["org_id"]}'
else:
data['error'] = {'message': 'JSON is missing "org_id" property', 'code': 422}
if request_json and 'scc_source_name' in request_json:
data['scc_source_name'] = request_json['scc_source_name']
else:
data['error'] = {'message': 'JSON is missing "source_name" property', 'code': 422}
if request_json and 'image_family' in request_json:
data['image_family'] = request_json['image_family']
else:
data['error'] = {'message': 'JSON is missing "image_family" property', 'code': 422}
return data
@functions_framework.http
def scan(request):
data = parse_request(request)
if 'error' in data:
return data['message'], data['code']
client = securitycenter.SecurityCenterClient()
image_filter = f'security_center_properties.resource_type = "google.compute.Image" AND resource_properties.family = "{data["image_family"]}"'
asset_iterator = client.list_assets(
request={
'parent': data['org_name'],
'filter': image_filter
}
)
newest_image_asset = None
old_images = []
# Which is the newest and create a list of all the old ones
for image_asset in asset_iterator:
if not newest_image_asset and is_ready(image_asset):
newest_image_asset = image_asset
else:
if creation_date(image_asset) > creation_date(newest_image_asset) and is_ready(image_asset):
old_images.append(newest_image_asset.asset.resource_properties.get('selfLink'))
newest_image_asset = image_asset
if not newest_image_asset:
message = f'No images to scan for family {data["image_family"]}'
logging.info(message)
return message, 200
else:
# Find all the disk assets, image assets doen't have a "users" property,
# and disks assets do not permit query by image family
disk_filter = f'security_center_properties.resource_type = "google.compute.Disk"'
asset_iterator = client.list_assets(
request={
'parent': data['org_name'],
'filter': disk_filter
}
)
response = []
for disk_asset in asset_iterator:
if disk_asset.asset.resource_properties.get('sourceImage') in old_images:
disk_name = disk_asset.asset.security_center_properties.resource_name
logging.info(f'Found {disk_name} using a dated golden image, creating security finding.')
finding_id = uuid.uuid4().hex
source_name = data['scc_source_name']
finding = Finding(
state = Finding.State.ACTIVE,
resource_name = disk_name,
category = 'USING_DATED_GOLDEN_IMAGE',
event_time = datetime.datetime.now(tz=datetime.timezone.utc),
)
created_finding = client.create_finding(
parent=source_name,
finding_id=finding_id,
finding=finding
)
message = f'New security finding created of {created_finding.name}'
logging.info(message)
response.append(message)
return '. '.join(response), 201 if len(response) > 0 else 200
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment