Skip to content

Instantly share code, notes, and snippets.

@mkanoor
Last active July 24, 2025 19:16
Show Gist options
  • Select an option

  • Save mkanoor/734eac960f466cdd22e37998aecc88d3 to your computer and use it in GitHub Desktop.

Select an option

Save mkanoor/734eac960f466cdd22e37998aecc88d3 to your computer and use it in GitHub Desktop.
"""
Base authenticator migrator class.
This module defines the contract that all specific authenticator migrators must follow.
"""
from urllib.parse import urlparse, parse_qs
from django.conf import settings
from gateway_client import GatewayAPIError
class BaseAuthenticatorMigrator:
"""
Base class for all authenticator migrators.
Defines the contract that all specific authenticator migrators must follow.
"""
# Class-level flag to track if LOGIN_REDIRECT_OVERRIDE was set by any migrator
login_redirect_override_set_by_migrator = False
def __init__(self, gateway_client=None, command=None, force=False):
"""
Initialize the authenticator migrator.
Args:
gateway_client: GatewayClient instance for API calls
command: Optional Django management command instance (for styled output)
force: If True, force migration even if configurations already exist
"""
self.gateway_client = gateway_client
self.command = command
self.force = force
self.encrypted_fields = [
# LDAP Fields
'BIND_PASSWORD',
# The following authenticators all use the same key to store encrypted information:
# Generic OIDC
# RADIUS
# TACACS+
# GitHub OAuth2
# Azure AD OAuth2
# Google OAuth2
'SECRET',
# SAML Fields
'SP_PRIVATE_KEY',
]
def migrate(self):
"""
Main entry point - orchestrates the migration process.
Returns:
dict: Summary of migration results
"""
# Get configuration from AWX/Controller
configs = self.get_controller_config()
if not configs:
self._write_output(f'No {self.get_authenticator_type()} authenticators found to migrate.', 'warning')
return {'created': 0, 'updated': 0, 'unchanged': 0, 'failed': 0, 'mappers_created': 0, 'mappers_updated': 0, 'mappers_failed': 0}
self._write_output(f'Found {len(configs)} {self.get_authenticator_type()} authentication configuration(s).', 'success')
# Process each authenticator configuration
created_authenticators = []
updated_authenticators = []
unchanged_authenticators = []
failed_authenticators = []
for config in configs:
result = self.create_gateway_authenticator(config)
if result['success']:
if result['action'] == 'created':
created_authenticators.append(config)
elif result['action'] == 'updated':
updated_authenticators.append(config)
elif result['action'] == 'skipped':
unchanged_authenticators.append(config)
else:
failed_authenticators.append(config)
# Process mappers for successfully created/updated/unchanged authenticators
mappers_created = 0
mappers_updated = 0
mappers_failed = 0
successful_authenticators = created_authenticators + updated_authenticators + unchanged_authenticators
if successful_authenticators:
self._write_output('\n=== Processing Authenticator Mappers ===', 'success')
for config in successful_authenticators:
mapper_result = self._process_gateway_mappers(config)
mappers_created += mapper_result['created']
mappers_updated += mapper_result['updated']
mappers_failed += mapper_result['failed']
# Authenticators don't have settings, so settings counts are always 0
return {
'created': len(created_authenticators),
'updated': len(updated_authenticators),
'unchanged': len(unchanged_authenticators),
'failed': len(failed_authenticators),
'mappers_created': mappers_created,
'mappers_updated': mappers_updated,
'mappers_failed': mappers_failed,
'settings_created': 0,
'settings_updated': 0,
'settings_unchanged': 0,
'settings_failed': 0,
}
def get_controller_config(self):
"""
Gather configuration from AWX/Controller.
Returns:
list: List of configuration dictionaries
"""
raise NotImplementedError("Subclasses must implement get_controller_config()")
def create_gateway_authenticator(self, config):
"""
Create authenticator in Gateway.
Args:
config: Configuration dictionary from get_controller_config()
Returns:
bool: True if authenticator was created successfully, False otherwise
"""
raise NotImplementedError("Subclasses must implement create_gateway_authenticator()")
def get_authenticator_type(self):
"""
Get the human-readable authenticator type name.
Returns:
str: Authenticator type name for logging
"""
raise NotImplementedError("Subclasses must implement get_authenticator_type()")
def _generate_authenticator_slug(self, auth_type, category):
"""Generate a deterministic slug for an authenticator."""
return f"aap-{auth_type}-{category}".lower()
def submit_authenticator(self, gateway_config, ignore_keys=[], config={}):
"""
Submit an authenticator to Gateway - either create new or update existing.
Args:
gateway_config: Complete Gateway authenticator configuration
ignore_keys: List of configuration keys to ignore during comparison
config: Optional AWX config dict to store result data
Returns:
dict: Result with 'success' (bool), 'action' ('created', 'updated', 'skipped'), 'error' (str or None)
"""
authenticator_slug = gateway_config.get('slug')
if not authenticator_slug:
self._write_output('Gateway config missing slug, cannot submit authenticator', 'error')
return {'success': False, 'action': None, 'error': 'Missing slug'}
try:
# Check if authenticator already exists by slug
existing_authenticator = self.gateway_client.get_authenticator_by_slug(authenticator_slug)
if existing_authenticator:
# Authenticator exists, check if configuration matches
authenticator_id = existing_authenticator.get('id')
configs_match, differences = self._authenticator_configs_match(existing_authenticator, gateway_config, ignore_keys)
if configs_match:
self._write_output(f'‚ö† Authenticator already exists with matching configuration (ID: {authenticator_id})', 'warning')
# Store the existing result for mapper creation
config['gateway_authenticator_id'] = authenticator_id
config['gateway_authenticator'] = existing_authenticator
return {'success': True, 'action': 'skipped', 'error': None}
else:
self._write_output(f'‚ö† Authenticator exists but configuration differs (ID: {authenticator_id})', 'warning')
self._write_output(' Configuration comparison:')
# Log differences between the existing and the new configuration in case of an update
for difference in differences:
self._write_output(f' {difference}')
# Update the existing authenticator
self._write_output(' Updating authenticator with new configuration...')
try:
# Don't include the slug in the update since it shouldn't change
update_config = gateway_config.copy()
if 'slug' in update_config:
del update_config['slug']
result = self.gateway_client.update_authenticator(authenticator_id, update_config)
self._write_output(f'‚úì Successfully updated authenticator with ID: {authenticator_id}', 'success')
# Store the updated result for mapper creation
config['gateway_authenticator_id'] = authenticator_id
config['gateway_authenticator'] = result
return {'success': True, 'action': 'updated', 'error': None}
except GatewayAPIError as e:
self._write_output(f'‚úó Failed to update authenticator: {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return {'success': False, 'action': 'update_failed', 'error': e.message}
else:
# Authenticator doesn't exist, create it
self._write_output('Creating new authenticator...')
# Create the authenticator
result = self.gateway_client.create_authenticator(gateway_config)
self._write_output(f'‚úì Successfully created authenticator with ID: {result.get("id")}', 'success')
# Store the result for potential mapper creation later
config['gateway_authenticator_id'] = result.get('id')
config['gateway_authenticator'] = result
return {'success': True, 'action': 'created', 'error': None}
except GatewayAPIError as e:
self._write_output(f'‚úó Failed to submit authenticator: {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return {'success': False, 'action': 'failed', 'error': e.message}
except Exception as e:
self._write_output(f'‚úó Unexpected error submitting authenticator: {str(e)}', 'error')
return {'success': False, 'action': 'failed', 'error': str(e)}
def _authenticator_configs_match(self, existing_auth, new_config, ignore_keys=[]):
"""
Compare existing authenticator configuration with new configuration.
Args:
existing_auth: Existing authenticator data from Gateway
new_config: New authenticator configuration to be created
ignore_keys: List of configuration keys to ignore during comparison
(e.g., ['CALLBACK_URL'] for auto-generated fields)
Returns:
bool: True if configurations match, False otherwise
"""
# Add encrypted fields to ignore_keys if force flag is not set
# This prevents secrets from being updated unless explicitly forced
effective_ignore_keys = ignore_keys.copy()
if not self.force:
effective_ignore_keys.extend(self.encrypted_fields)
# Keep track of the differences between the existing and the new configuration
# Logging them makes debugging much easier
differences = []
if existing_auth.get('name') != new_config.get('name'):
differences.append(f' name: existing="{existing_auth.get("name")}" vs new="{new_config.get("name")}"')
elif existing_auth.get('type') != new_config.get('type'):
differences.append(f' type: existing="{existing_auth.get("type")}" vs new="{new_config.get("type")}"')
elif existing_auth.get('enabled') != new_config.get('enabled'):
differences.append(f' enabled: existing="{existing_auth.get("enabled")}" vs new="{new_config.get("enabled")}"')
elif existing_auth.get('create_objects') != new_config.get('create_objects'):
differences.append(f' create_objects: existing="{existing_auth.get("create_objects")}" vs new="{new_config.get("create_objects")}"')
elif existing_auth.get('remove_users') != new_config.get('remove_users'):
differences.append(f' create_objects: existing="{existing_auth.get("remove_users")}" vs new="{new_config.get("remove_users")}"')
# Compare configuration section
existing_config = existing_auth.get('configuration', {})
new_config_section = new_config.get('configuration', {})
# Helper function to check if a key should be ignored
def should_ignore_key(config_key):
return config_key in effective_ignore_keys
# Check if all keys in new config exist in existing config with same values
for key, value in new_config_section.items():
if should_ignore_key(key):
continue
if key not in existing_config:
differences.append(f' {key}: existing=<missing> vs new="{value}"')
elif existing_config[key] != value:
differences.append(f' {key}: existing="{existing_config.get(key)}" vs new="{value}"')
# Check if existing config has extra keys that new config doesn't have
# (this might indicate configuration drift), but ignore keys in ignore_keys
for key in existing_config:
if should_ignore_key(key):
continue
if key not in new_config_section:
differences.append(f' {key}: existing="{existing_config.get(key)}" vs new=<missing>')
return len(differences) == 0, differences
def _compare_mapper_lists(self, existing_mappers, new_mappers, ignore_keys=None):
"""
Compare existing and new mapper lists to determine which need updates vs creation.
Args:
existing_mappers: List of existing mapper configurations from Gateway
new_mappers: List of new mapper configurations to be created/updated
ignore_keys: List of keys to ignore during comparison (e.g., auto-generated fields)
Returns:
tuple: (mappers_to_update, mappers_to_create)
mappers_to_update: List of tuples (existing_mapper, new_mapper) for updates
mappers_to_create: List of new_mapper configs that don't match any existing
"""
if ignore_keys is None:
ignore_keys = []
mappers_to_update = []
mappers_to_create = []
for new_mapper in new_mappers:
matched_existing = None
# Try to find a matching existing mapper
for existing_mapper in existing_mappers:
if self._mappers_match_structurally(existing_mapper, new_mapper):
matched_existing = existing_mapper
break
if matched_existing:
# Check if the configuration actually differs (ignoring auto-generated fields)
if not self._mapper_configs_match(matched_existing, new_mapper, ignore_keys):
mappers_to_update.append((matched_existing, new_mapper))
# If configs match exactly, no action needed (mapper is up to date)
else:
# No matching existing mapper found, needs to be created
mappers_to_create.append(new_mapper)
return mappers_to_update, mappers_to_create
def _mappers_match_structurally(self, existing_mapper, new_mapper):
"""
Check if two mappers match structurally (same organization, team, map_type, role).
This identifies if they represent the same logical mapping.
Args:
existing_mapper: Existing mapper configuration from Gateway
new_mapper: New mapper configuration
Returns:
bool: True if mappers represent the same logical mapping
"""
# Compare key structural fields that identify the same logical mapper
structural_fields = ['name']
for field in structural_fields:
if existing_mapper.get(field) != new_mapper.get(field):
return False
return True
def _mapper_configs_match(self, existing_mapper, new_mapper, ignore_keys=None):
"""
Compare mapper configurations to check if they are identical.
Args:
existing_mapper: Existing mapper configuration from Gateway
new_mapper: New mapper configuration
ignore_keys: List of keys to ignore during comparison
Returns:
bool: True if configurations match, False otherwise
"""
if ignore_keys is None:
ignore_keys = []
# Helper function to check if a key should be ignored
def should_ignore_key(config_key):
return config_key in ignore_keys
# Compare all mapper fields except ignored ones
all_keys = set(existing_mapper.keys()) | set(new_mapper.keys())
for key in all_keys:
if should_ignore_key(key):
continue
existing_value = existing_mapper.get(key)
new_value = new_mapper.get(key)
if existing_value != new_value:
return False
return True
def _process_gateway_mappers(self, config):
"""Process authenticator mappers in Gateway from AWX config - create or update as needed."""
authenticator_id = config.get('gateway_authenticator_id')
if not authenticator_id:
self._write_output(f'No authenticator ID found for {config["category"]}, skipping mappers', 'error')
return {'created': 0, 'updated': 0, 'failed': 0}
category = config['category']
org_mappers = config.get('org_mappers', [])
team_mappers = config.get('team_mappers', [])
role_mappers = config.get('role_mappers', [])
allow_mappers = config.get('allow_mappers', [])
all_new_mappers = org_mappers + team_mappers + role_mappers + allow_mappers
if len(all_new_mappers) == 0:
self._write_output(f'No mappers to process for {category} authenticator')
return {'created': 0, 'updated': 0, 'failed': 0}
self._write_output(f'\n--- Processing mappers for {category} authenticator (ID: {authenticator_id}) ---')
self._write_output(f'Organization mappers: {len(org_mappers)}')
self._write_output(f'Team mappers: {len(team_mappers)}')
self._write_output(f'Role mappers: {len(role_mappers)}')
self._write_output(f'Allow mappers: {len(allow_mappers)}')
# Get existing mappers from Gateway
try:
existing_mappers = self.gateway_client.get_authenticator_maps(authenticator_id)
except GatewayAPIError as e:
self._write_output(f'Failed to retrieve existing mappers: {e.message}', 'error')
return {'created': 0, 'updated': 0, 'failed': len(all_new_mappers)}
# Define mapper-specific ignore keys (can be overridden by subclasses)
ignore_keys = self._get_mapper_ignore_keys()
# Compare existing vs new mappers
mappers_to_update, mappers_to_create = self._compare_mapper_lists(existing_mappers, all_new_mappers, ignore_keys)
self._write_output(f'Mappers to create: {len(mappers_to_create)}')
self._write_output(f'Mappers to update: {len(mappers_to_update)}')
created_count = 0
updated_count = 0
failed_count = 0
# Process updates
for existing_mapper, new_mapper in mappers_to_update:
if self._update_single_mapper(existing_mapper, new_mapper):
updated_count += 1
else:
failed_count += 1
# Process creations
for new_mapper in mappers_to_create:
mapper_type = new_mapper.get('map_type', 'unknown')
if self._create_single_mapper(authenticator_id, new_mapper, mapper_type):
created_count += 1
else:
failed_count += 1
# Summary
self._write_output(f'Mappers created: {created_count}, updated: {updated_count}, failed: {failed_count}')
return {'created': created_count, 'updated': updated_count, 'failed': failed_count}
def _get_mapper_ignore_keys(self):
"""
Get list of mapper keys to ignore during comparison.
Can be overridden by subclasses for mapper-specific ignore keys.
Returns:
list: List of keys to ignore (e.g., auto-generated fields)
"""
return ['id', 'authenticator', 'created', 'modified', 'summary_fields', 'modified_by', 'created_by', 'related', 'url']
def _update_single_mapper(self, existing_mapper, new_mapper):
"""Update a single mapper in Gateway.
Args:
existing_mapper: Existing mapper data from Gateway
new_mapper: New mapper configuration to update to
Returns:
bool: True if mapper was updated successfully, False otherwise
"""
try:
mapper_id = existing_mapper.get('id')
if not mapper_id:
self._write_output(' ‚úó Existing mapper missing ID, cannot update', 'error')
return False
# Prepare update config - don't include fields that shouldn't be updated
update_config = new_mapper.copy()
# Remove fields that shouldn't be updated (read-only or auto-generated)
fields_to_remove = ['id', 'authenticator', 'created', 'modified']
for field in fields_to_remove:
update_config.pop(field, None)
# Update the mapper
self.gateway_client.update_authenticator_map(mapper_id, update_config)
mapper_name = new_mapper.get('name', 'Unknown')
self._write_output(f' ‚úì Updated mapper: {mapper_name}', 'success')
return True
except GatewayAPIError as e:
mapper_name = new_mapper.get('name', 'Unknown')
self._write_output(f' ‚úó Failed to update mapper "{mapper_name}": {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
mapper_name = new_mapper.get('name', 'Unknown')
self._write_output(f' ‚úó Unexpected error updating mapper "{mapper_name}": {str(e)}', 'error')
return False
def _create_single_mapper(self, authenticator_id, mapper_config, mapper_type):
"""Create a single mapper in Gateway."""
try:
# Update the mapper config with the correct authenticator ID
mapper_config = mapper_config.copy() # Don't modify the original
mapper_config['authenticator'] = authenticator_id
# Create the mapper
self.gateway_client.create_authenticator_map(authenticator_id, mapper_config)
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ‚úì Created {mapper_type} mapper: {mapper_name}', 'success')
return True
except GatewayAPIError as e:
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ‚úó Failed to create {mapper_type} mapper "{mapper_name}": {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ‚úó Unexpected error creating {mapper_type} mapper "{mapper_name}": {str(e)}', 'error')
return False
def get_social_org_map(self, authenticator_setting_name):
"""
Get social auth organization map with fallback to global setting.
Args:
authenticator_setting_name: Name of the authenticator-specific organization map setting
(e.g., 'SOCIAL_AUTH_GITHUB_ORGANIZATION_MAP')
Returns:
dict: Organization mapping configuration, with fallback to global setting
"""
# Try authenticator-specific setting first
authenticator_map = getattr(settings, authenticator_setting_name, None)
if authenticator_map:
return authenticator_map
# Fall back to global setting
global_map = getattr(settings, 'SOCIAL_AUTH_ORGANIZATION_MAP', {})
return global_map
def get_social_team_map(self, authenticator_setting_name):
"""
Get social auth team map with fallback to global setting.
Args:
authenticator_setting_name: Name of the authenticator-specific team map setting
(e.g., 'SOCIAL_AUTH_GITHUB_TEAM_MAP')
Returns:
dict: Team mapping configuration, with fallback to global setting
"""
# Try authenticator-specific setting first
authenticator_map = getattr(settings, authenticator_setting_name, None)
if authenticator_map:
return authenticator_map
# Fall back to global setting
global_map = getattr(settings, 'SOCIAL_AUTH_TEAM_MAP', {})
return global_map
def handle_login_override(self, config, valid_login_urls):
"""
Handle LOGIN_REDIRECT_OVERRIDE setting for this authenticator.
This method checks if the login_redirect_override from the config matches
any of the provided valid_login_urls. If it matches, it updates the
LOGIN_REDIRECT_OVERRIDE setting in Gateway with the new authenticator's
URL and sets the class flag to indicate it was handled.
Args:
config: Configuration dictionary containing:
- login_redirect_override: The current LOGIN_REDIRECT_OVERRIDE value
- gateway_authenticator: The created/updated authenticator info
valid_login_urls: List of URL patterns to match against
"""
login_redirect_override = config.get('login_redirect_override')
if not login_redirect_override:
return
# Check if the login_redirect_override matches any of the provided valid URLs
url_matches = False
parsed_redirect = urlparse(login_redirect_override)
for valid_url in valid_login_urls:
parsed_valid = urlparse(valid_url)
# Compare path: redirect path should match or contain the valid path at proper boundaries
if parsed_redirect.path == parsed_valid.path:
path_matches = True
elif parsed_redirect.path.startswith(parsed_valid.path):
# Ensure the match is at a path boundary (followed by '/' or end of string)
next_char_pos = len(parsed_valid.path)
if next_char_pos >= len(parsed_redirect.path) or parsed_redirect.path[next_char_pos] in ['/', '?']:
path_matches = True
else:
path_matches = False
else:
path_matches = False
# Compare query: if valid URL has query params, they should be present in redirect URL
query_matches = True
if parsed_valid.query:
# Parse query parameters for both URLs
valid_params = parse_qs(parsed_valid.query, keep_blank_values=True)
redirect_params = parse_qs(parsed_redirect.query, keep_blank_values=True) if parsed_redirect.query else {}
# All valid URL query params must be present in redirect URL with same values
query_matches = all(param in redirect_params and redirect_params[param] == values for param, values in valid_params.items())
if path_matches and query_matches:
url_matches = True
break
if not url_matches:
return
# Extract the created authenticator from config
gateway_authenticator = config.get('gateway_authenticator')
if not gateway_authenticator:
return
sso_login_url = gateway_authenticator.get('sso_login_url')
if not sso_login_url:
return
# Update LOGIN_REDIRECT_OVERRIDE with the new Gateway URL
gateway_base_url = self.gateway_client.get_base_url()
urlparse_gw = urlparse(gateway_base_url)
complete_url = parsed_redirect._replace(scheme=urlparse_gw.scheme, path=sso_login_url, netloc=urlparse_gw.netloc).geturl()
self._write_output(f'Updating LOGIN_REDIRECT_OVERRIDE to: {complete_url}')
self.gateway_client.update_gateway_setting('LOGIN_REDIRECT_OVERRIDE', complete_url)
# Set the class-level flag to indicate LOGIN_REDIRECT_OVERRIDE was handled by a migrator
BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator = True
def _write_output(self, message, style=None):
"""Write output message if command is available."""
if self.command:
if style == 'success':
self.command.stdout.write(self.command.style.SUCCESS(message))
elif style == 'warning':
self.command.stdout.write(self.command.style.WARNING(message))
elif style == 'error':
self.command.stdout.write(self.command.style.ERROR(message))
else:
self.command.stdout.write(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment