Created
October 6, 2020 23:08
-
-
Save acdha/99347f5caa4fbb92b301d79d7e5b8bf2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Bulk register AWS Accounts with Qualys AssetView | |
Given a list of account IDs, registers them and prints CSV output of the Account | |
ID and External ID in the format expected by the account setup Terraform code. | |
A list of accounts to register can be obtained like this:: | |
$ grep -vf <(csvgrep --invert-match --columns 'External ID' --regex '^$' account-setup/qualys-assetview-accounts.csv | csvcut -c 'Account ID' | sed 1d) \ | |
organizations/organization-accounts.csv \ | |
| csvcut -c "id" \ | |
| sed 1d > unregistered-accounts | |
""" | |
import argparse | |
import random | |
from csv import DictReader | |
from getpass import getpass | |
from urllib.parse import urljoin | |
import requests | |
USERNAME = input("Qualys username: ") | |
PASSWORD = getpass(f"Qualys password for {USERNAME}: ") | |
BASE_URL = "https://qualysapi.qg2.apps.qualys.com/" | |
def register_ec2_connector(account_id, account_name, external_id_length=20): | |
# External IDs are a random per-account integer which has to be between 9 | |
# and 90 digits: | |
external_id = random.randint( | |
10 ** external_id_length, | |
(10 ** (external_id_length + 1)) - 1, | |
) | |
payload = f""" | |
<ServiceRequest> | |
<data> | |
<AwsAssetDataConnector> | |
<name>{account_name}</name> | |
<activation> | |
<set> | |
<ActivationModule>VM</ActivationModule> | |
<ActivationModule>PC</ActivationModule> | |
</set> | |
</activation> | |
<arn>arn:aws:iam::{account_id}:role/Role_For_QualysEC2Connector</arn> | |
<externalId>{external_id}</externalId> | |
<allRegions>true</allRegions> | |
</AwsAssetDataConnector> | |
</data> | |
</ServiceRequest> | |
""".strip() | |
resp = requests.post( | |
urljoin(BASE_URL, "/qps/rest/2.0/create/am/awsassetdataconnector"), | |
auth=(USERNAME, PASSWORD), | |
data=payload, | |
) | |
resp.raise_for_status() | |
return external_id | |
def process_accounts(account_ids, account_information): | |
for account_id in account_ids: | |
account_name = account_information[account_id] | |
print( | |
account_id, | |
register_ec2_connector(account_id, account_name), | |
sep=",", | |
flush=True, | |
) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description=__doc__.strip()) | |
parser.add_argument( | |
"--account-list", | |
type=argparse.FileType("r"), | |
help="CSV file with 'id' and 'account_name' columns", | |
) | |
parser.add_argument( | |
metavar="AWS_ACCOUNT_ID", | |
dest="account_ids", | |
nargs="+", | |
help="One or more account IDs", | |
) | |
args = parser.parse_args() | |
account_list = {i["id"]: i["account_name"] for i in DictReader(args.account_list)} | |
print('"Account ID","External ID"') | |
process_accounts(args.account_ids, account_list) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Bulk register AWS Accounts with Qualys CloudView | |
Given a list of account IDs, registers them and prints CSV output of the Account | |
ID and External ID in the format expected by the account setup Terraform code. | |
A list of accounts to register can be obtained like this:: | |
$ grep -vf <(csvgrep --invert-match --columns 'External ID' --regex '^$' account-setup/qualys-cloudview-accounts.csv | csvcut -c 'Account ID' | sed 1d) \ | |
organizations/organization-accounts.csv \ | |
| csvcut -c "id" \ | |
| sed 1d > unregistered-accounts | |
""" | |
import argparse | |
import random | |
import sys | |
from argparse import ArgumentDefaultsHelpFormatter | |
from csv import DictReader | |
from getpass import getpass | |
from urllib.parse import urljoin | |
import requests | |
def register_ec2_connector( | |
account_id, | |
account_name, | |
*, | |
external_id_length=20, | |
base_url="", | |
username="", | |
password="", | |
): | |
# External IDs are a random per-account integer which has to be between 9 | |
# and 90 digits: | |
external_id = random.randint( | |
10 ** external_id_length, | |
(10 ** (external_id_length + 1)) - 1, | |
) | |
payload = { | |
"arn": f"arn:aws:iam::{account_id}:role/Role_For_Qualys_CV", | |
"description": account_name, | |
"externalId": str(external_id), | |
"isChinaRegion": False, | |
"isGovCloud": False, | |
"isPortalConnector": False, | |
"name": account_name, | |
"pollingFrequency": {"hours": 4, "minutes": 0}, | |
} | |
resp = requests.post( | |
urljoin(base_url, "/cloudview-api/rest/v1/aws/connectors"), | |
auth=(username, password), | |
json=payload, | |
) | |
resp.raise_for_status() | |
return external_id | |
def process_accounts( | |
account_ids, | |
account_information, | |
base_url="", | |
username="", | |
password="", | |
): | |
for account_id in account_ids: | |
account_name = account_information[account_id] | |
try: | |
external_id = register_ec2_connector( | |
account_id, | |
account_name, | |
base_url=base_url, | |
username=username, | |
password=password, | |
) | |
except requests.exceptions.HTTPError as exc: | |
print(f"Skipping {account_id}: {exc}", file=sys.stderr) | |
continue | |
print( | |
account_id, | |
external_id, | |
sep=",", | |
flush=True, | |
) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description=__doc__.strip(), formatter_class=ArgumentDefaultsHelpFormatter | |
) | |
parser.add_argument( | |
"--account-list", | |
type=argparse.FileType("r"), | |
help="CSV file with 'id' and 'account_name' columns", | |
) | |
parser.add_argument( | |
metavar="AWS_ACCOUNT_ID", | |
dest="account_ids", | |
nargs="+", | |
help="One or more account IDs", | |
) | |
parser.add_argument( | |
"--base-url", | |
metavar="URL", | |
help="Base URL for the Qualys API endpoint", | |
default="https://qualysguard.qg2.apps.qualys.com/", | |
) | |
parser.add_argument( | |
"--username", | |
help="Qualys username", | |
) | |
args = parser.parse_args() | |
password = getpass(f"Qualys password for {args.username}: ") | |
account_list = {i["id"]: i["account_name"] for i in DictReader(args.account_list)} | |
print('"Account ID","External ID"') | |
process_accounts( | |
args.account_ids, | |
account_list, | |
base_url=args.base_url, | |
username=args.username, | |
password=password, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment