Skip to content

Instantly share code, notes, and snippets.

@mkanoor
Last active July 24, 2025 19:12
Show Gist options
  • Select an option

  • Save mkanoor/bedd7de83603276f7f56741e4b16cacc to your computer and use it in GitHub Desktop.

Select an option

Save mkanoor/bedd7de83603276f7f56741e4b16cacc to your computer and use it in GitHub Desktop.
import sys
import os
sys.path.append("/tmp")
from django.core.management.base import BaseCommand
from urllib.parse import urlparse, urlunparse
from saml_migrator import SAMLMigrator
from settings_migrator import SettingsMigrator
from gateway_client import GatewayClient, GatewayAPIError
class Command(BaseCommand):
help = 'Import existing auth provider configurations to AAP Gateway via API requests'
def add_arguments(self, parser):
parser.add_argument('--basic-auth', action='store_true', help='Use HTTP Basic Authentication between Controller and Gateway')
parser.add_argument(
'--skip-all-authenticators',
action='store_true',
help='Skip importing all authenticators [GitHub, OIDC, SAML, Azure AD, LDAP, RADIUS, TACACS+, Google OAuth2]',
)
parser.add_argument('--skip-oidc', action='store_true', help='Skip importing generic OIDC authenticators')
parser.add_argument('--skip-github', action='store_true', help='Skip importing GitHub authenticator')
parser.add_argument('--skip-ldap', action='store_true', help='Skip importing LDAP authenticators')
parser.add_argument('--skip-ad', action='store_true', help='Skip importing Azure AD authenticator')
parser.add_argument('--skip-saml', action='store_true', help='Skip importing SAML authenticator')
parser.add_argument('--skip-radius', action='store_true', help='Skip importing RADIUS authenticator')
parser.add_argument('--skip-tacacs', action='store_true', help='Skip importing TACACS+ authenticator')
parser.add_argument('--skip-google', action='store_true', help='Skip importing Google OAuth2 authenticator')
parser.add_argument('--skip-settings', action='store_true', help='Skip importing settings')
parser.add_argument('--force', action='store_true', help='Force migration even if configurations already exist')
def handle(self, *args, **options):
# Read Gateway connection parameters from environment variables
gateway_base_url = os.getenv('GATEWAY_BASE_URL')
gateway_user = os.getenv('GATEWAY_USER')
gateway_password = os.getenv('GATEWAY_PASSWORD')
gateway_skip_verify = os.getenv('GATEWAY_SKIP_VERIFY', '').lower() in ('true', '1', 'yes', 'on')
skip_all_authenticators = False
skip_saml = False
skip_settings = False
force = False
basic_auth = True
management_command_validation_errors = []
# If the management command isn't called with all parameters needed to talk to Gateway, consider
# it a dry-run and exit cleanly
if not gateway_base_url and basic_auth:
management_command_validation_errors.append('- GATEWAY_BASE_URL: Base URL of the AAP Gateway instance')
if (not gateway_user or not gateway_password) and basic_auth:
management_command_validation_errors.append('- GATEWAY_USER: Username for AAP Gateway authentication')
management_command_validation_errors.append('- GATEWAY_PASSWORD: Password for AAP Gateway authentication')
if len(management_command_validation_errors) > 0:
self.stdout.write(self.style.WARNING('Missing required environment variables:'))
for validation_error in management_command_validation_errors:
self.stdout.write(self.style.WARNING(f"{validation_error}"))
self.stdout.write(self.style.WARNING('- GATEWAY_SKIP_VERIFY: Skip SSL certificate verification (optional)'))
sys.exit(0)
resource_api_client = None
response = None
if basic_auth:
self.stdout.write(self.style.SUCCESS('HTTP Basic Auth: true'))
self.stdout.write(self.style.SUCCESS(f'Gateway Base URL: {gateway_base_url}'))
self.stdout.write(self.style.SUCCESS(f'Gateway User: {gateway_user}'))
self.stdout.write(self.style.SUCCESS('Gateway Password: *******************'))
self.stdout.write(self.style.SUCCESS(f'Skip SSL Verification: {gateway_skip_verify}'))
else:
resource_api_client = create_api_client()
resource_api_client.verify_https = not gateway_skip_verify
response = resource_api_client.get_service_metadata()
parsed_url = urlparse(resource_api_client.base_url)
resource_api_client.base_url = urlunparse((parsed_url.scheme, parsed_url.netloc, '/', '', '', ''))
self.stdout.write(self.style.SUCCESS('Gateway Service Token: true'))
self.stdout.write(self.style.SUCCESS(f'Gateway Base URL: {resource_api_client.base_url}'))
self.stdout.write(self.style.SUCCESS(f'Gateway JWT User: {resource_api_client.jwt_user_id}'))
self.stdout.write(self.style.SUCCESS(f'Gateway JWT Expiration: {resource_api_client.jwt_expiration}'))
self.stdout.write(self.style.SUCCESS(f'Skip SSL Verification: {not resource_api_client.verify_https}'))
self.stdout.write(self.style.SUCCESS(f'Connection Validated: {response.status_code == 200}'))
# Create Gateway client and run migrations
try:
self.stdout.write(self.style.SUCCESS('\n=== Connecting to Gateway ==='))
pre_gateway_client = None
if basic_auth:
self.stdout.write(self.style.SUCCESS('\n=== With Basic HTTP Auth ==='))
pre_gateway_client = GatewayClient(
base_url=gateway_base_url, username=gateway_user, password=gateway_password, skip_verify=gateway_skip_verify, command=self
)
else:
self.stdout.write(self.style.SUCCESS('\n=== With Service Token ==='))
pre_gateway_client = GatewayClientSVCToken(resource_api_client=resource_api_client, command=self)
with pre_gateway_client as gateway_client:
self.stdout.write(self.style.SUCCESS('Successfully connected to Gateway'))
# Initialize migrators
migrators = []
if not skip_all_authenticators:
if not skip_saml:
migrators.append(SAMLMigrator(gateway_client, self, force=force))
if not migrators:
self.stdout.write(self.style.WARNING('No authentication configurations found to migrate.'))
if not skip_settings:
migrators.append(SettingsMigrator(gateway_client, self, force=force))
else:
self.stdout.write(self.style.WARNING('Settings migration will not execute.'))
# Run migrations
total_results = {
'created': 0,
'updated': 0,
'unchanged': 0,
'failed': 0,
'mappers_created': 0,
'mappers_updated': 0,
'mappers_failed': 0,
'settings_created': 0,
'settings_updated': 0,
'settings_unchanged': 0,
'settings_failed': 0,
}
if not migrators:
self.stdout.write(self.style.WARNING('NO MIGRATIONS WILL EXECUTE.'))
else:
for migrator in migrators:
self.stdout.write(self.style.SUCCESS(f'\n=== Migrating {migrator.get_authenticator_type()} Configurations ==='))
result = migrator.migrate()
self._print_export_summary(migrator.get_authenticator_type(), result)
# Accumulate results - handle missing keys gracefully
for key in total_results:
total_results[key] += result.get(key, 0)
# Overall summary
self.stdout.write(self.style.SUCCESS('\n=== Migration Summary ==='))
self.stdout.write(f'Total authenticators created: {total_results["created"]}')
self.stdout.write(f'Total authenticators updated: {total_results["updated"]}')
self.stdout.write(f'Total authenticators unchanged: {total_results["unchanged"]}')
self.stdout.write(f'Total authenticators failed: {total_results["failed"]}')
self.stdout.write(f'Total mappers created: {total_results["mappers_created"]}')
self.stdout.write(f'Total mappers updated: {total_results["mappers_updated"]}')
self.stdout.write(f'Total mappers failed: {total_results["mappers_failed"]}')
self.stdout.write(f'Total settings created: {total_results["settings_created"]}')
self.stdout.write(f'Total settings updated: {total_results["settings_updated"]}')
self.stdout.write(f'Total settings unchanged: {total_results["settings_unchanged"]}')
self.stdout.write(f'Total settings failed: {total_results["settings_failed"]}')
except GatewayAPIError as e:
self.stdout.write(self.style.ERROR(f'Gateway API Error: {e.message}'))
if e.status_code:
self.stdout.write(self.style.ERROR(f'Status Code: {e.status_code}'))
if e.response_data:
self.stdout.write(self.style.ERROR(f'Response: {e.response_data}'))
return
except Exception as e:
self.stdout.write(self.style.ERROR(f'Unexpected error during migration: {str(e)}'))
return
def _print_export_summary(self, config_type, result):
"""Print a summary of the export results."""
self.stdout.write(f'\n--- {config_type} Export Summary ---')
if config_type in ['GitHub', 'OIDC', 'SAML', 'Azure AD', 'LDAP', 'RADIUS', 'TACACS+', 'Google OAuth2']:
self.stdout.write(f'Authenticators created: {result.get("created", 0)}')
self.stdout.write(f'Authenticators updated: {result.get("updated", 0)}')
self.stdout.write(f'Authenticators unchanged: {result.get("unchanged", 0)}')
self.stdout.write(f'Authenticators failed: {result.get("failed", 0)}')
self.stdout.write(f'Mappers created: {result.get("mappers_created", 0)}')
self.stdout.write(f'Mappers updated: {result.get("mappers_updated", 0)}')
self.stdout.write(f'Mappers failed: {result.get("mappers_failed", 0)}')
if config_type == 'Settings':
self.stdout.write(f'Settings created: {result.get("settings_created", 0)}')
self.stdout.write(f'Settings updated: {result.get("settings_updated", 0)}')
self.stdout.write(f'Settings unchanged: {result.get("settings_unchanged", 0)}')
self.stdout.write(f'Settings failed: {result.get("settings_failed", 0)}')
Command().handle()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment