Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acdha/99347f5caa4fbb92b301d79d7e5b8bf2 to your computer and use it in GitHub Desktop.
Save acdha/99347f5caa4fbb92b301d79d7e5b8bf2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Bulk register AWS Accounts with Qualys AssetView
Given a list of account IDs, registers them and prints CSV output of the Account
ID and External ID in the format expected by the account setup Terraform code.
A list of accounts to register can be obtained like this::
$ grep -vf <(csvgrep --invert-match --columns 'External ID' --regex '^$' account-setup/qualys-assetview-accounts.csv | csvcut -c 'Account ID' | sed 1d) \
organizations/organization-accounts.csv \
| csvcut -c "id" \
| sed 1d > unregistered-accounts
"""
import argparse
import random
from csv import DictReader
from getpass import getpass
from urllib.parse import urljoin
import requests
USERNAME = input("Qualys username: ")
PASSWORD = getpass(f"Qualys password for {USERNAME}: ")
BASE_URL = "https://qualysapi.qg2.apps.qualys.com/"
def register_ec2_connector(account_id, account_name, external_id_length=20):
# External IDs are a random per-account integer which has to be between 9
# and 90 digits:
external_id = random.randint(
10 ** external_id_length,
(10 ** (external_id_length + 1)) - 1,
)
payload = f"""
<ServiceRequest>
<data>
<AwsAssetDataConnector>
<name>{account_name}</name>
<activation>
<set>
<ActivationModule>VM</ActivationModule>
<ActivationModule>PC</ActivationModule>
</set>
</activation>
<arn>arn:aws:iam::{account_id}:role/Role_For_QualysEC2Connector</arn>
<externalId>{external_id}</externalId>
<allRegions>true</allRegions>
</AwsAssetDataConnector>
</data>
</ServiceRequest>
""".strip()
resp = requests.post(
urljoin(BASE_URL, "/qps/rest/2.0/create/am/awsassetdataconnector"),
auth=(USERNAME, PASSWORD),
data=payload,
)
resp.raise_for_status()
return external_id
def process_accounts(account_ids, account_information):
for account_id in account_ids:
account_name = account_information[account_id]
print(
account_id,
register_ec2_connector(account_id, account_name),
sep=",",
flush=True,
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__.strip())
parser.add_argument(
"--account-list",
type=argparse.FileType("r"),
help="CSV file with 'id' and 'account_name' columns",
)
parser.add_argument(
metavar="AWS_ACCOUNT_ID",
dest="account_ids",
nargs="+",
help="One or more account IDs",
)
args = parser.parse_args()
account_list = {i["id"]: i["account_name"] for i in DictReader(args.account_list)}
print('"Account ID","External ID"')
process_accounts(args.account_ids, account_list)
#!/usr/bin/env python3
"""
Bulk register AWS Accounts with Qualys CloudView
Given a list of account IDs, registers them and prints CSV output of the Account
ID and External ID in the format expected by the account setup Terraform code.
A list of accounts to register can be obtained like this::
$ grep -vf <(csvgrep --invert-match --columns 'External ID' --regex '^$' account-setup/qualys-cloudview-accounts.csv | csvcut -c 'Account ID' | sed 1d) \
organizations/organization-accounts.csv \
| csvcut -c "id" \
| sed 1d > unregistered-accounts
"""
import argparse
import random
import sys
from argparse import ArgumentDefaultsHelpFormatter
from csv import DictReader
from getpass import getpass
from urllib.parse import urljoin
import requests
def register_ec2_connector(
account_id,
account_name,
*,
external_id_length=20,
base_url="",
username="",
password="",
):
# External IDs are a random per-account integer which has to be between 9
# and 90 digits:
external_id = random.randint(
10 ** external_id_length,
(10 ** (external_id_length + 1)) - 1,
)
payload = {
"arn": f"arn:aws:iam::{account_id}:role/Role_For_Qualys_CV",
"description": account_name,
"externalId": str(external_id),
"isChinaRegion": False,
"isGovCloud": False,
"isPortalConnector": False,
"name": account_name,
"pollingFrequency": {"hours": 4, "minutes": 0},
}
resp = requests.post(
urljoin(base_url, "/cloudview-api/rest/v1/aws/connectors"),
auth=(username, password),
json=payload,
)
resp.raise_for_status()
return external_id
def process_accounts(
account_ids,
account_information,
base_url="",
username="",
password="",
):
for account_id in account_ids:
account_name = account_information[account_id]
try:
external_id = register_ec2_connector(
account_id,
account_name,
base_url=base_url,
username=username,
password=password,
)
except requests.exceptions.HTTPError as exc:
print(f"Skipping {account_id}: {exc}", file=sys.stderr)
continue
print(
account_id,
external_id,
sep=",",
flush=True,
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__.strip(), formatter_class=ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--account-list",
type=argparse.FileType("r"),
help="CSV file with 'id' and 'account_name' columns",
)
parser.add_argument(
metavar="AWS_ACCOUNT_ID",
dest="account_ids",
nargs="+",
help="One or more account IDs",
)
parser.add_argument(
"--base-url",
metavar="URL",
help="Base URL for the Qualys API endpoint",
default="https://qualysguard.qg2.apps.qualys.com/",
)
parser.add_argument(
"--username",
help="Qualys username",
)
args = parser.parse_args()
password = getpass(f"Qualys password for {args.username}: ")
account_list = {i["id"]: i["account_name"] for i in DictReader(args.account_list)}
print('"Account ID","External ID"')
process_accounts(
args.account_ids,
account_list,
base_url=args.base_url,
username=args.username,
password=password,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment