Last active
July 24, 2025 18:39
-
-
Save mkanoor/3e3cdeb5e61bd2af04d5f6c6170fd618 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Gateway API client for AAP Gateway interactions. | |
| This module provides a client class to interact with the AAP Gateway REST API, | |
| specifically for creating authenticators and mapping configurations. | |
| """ | |
| import requests | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from urllib.parse import urljoin | |
| logger = logging.getLogger(__name__) | |
| class GatewayAPIError(Exception): | |
| """Exception raised for Gateway API errors.""" | |
| def __init__(self, message: str, status_code: Optional[int] = None, response_data: Optional[Dict] = None): | |
| self.message = message | |
| self.status_code = status_code | |
| self.response_data = response_data | |
| super().__init__(self.message) | |
| class GatewayClient: | |
| """Client for AAP Gateway REST API interactions.""" | |
| def __init__(self, base_url: str, username: str, password: str, skip_verify: bool = False, skip_session_init: bool = False, command=None): | |
| """Initialize Gateway client. | |
| Args: | |
| base_url: Base URL of the AAP Gateway instance | |
| username: Username for authentication | |
| password: Password for authentication | |
| skip_verify: Skip SSL certificate verification | |
| skip_session_init: Skip initializing the session. Only set to True if you are using a base class that doesn't need the initialization of the session. | |
| command: The command object. This is used to write output to the console. | |
| """ | |
| self.base_url = base_url.rstrip('/') | |
| self.username = username | |
| self.password = password | |
| self.skip_verify = skip_verify | |
| self.command = command | |
| self.session_was_not_initialized = skip_session_init | |
| # Initialize session | |
| if not skip_session_init: | |
| self.session = requests.Session() | |
| # Configure SSL verification | |
| if skip_verify: | |
| self.session.verify = False | |
| # Disable SSL warnings when verification is disabled | |
| import urllib3 | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| # Set default headers | |
| self.session.headers.update( | |
| { | |
| 'User-Agent': 'AWX-Gateway-Migration-Client/1.0', | |
| 'Accept': 'application/json', | |
| 'Content-Type': 'application/json', | |
| } | |
| ) | |
| else: | |
| self.session = None | |
| # Authentication state | |
| self._authenticated = False | |
| def authenticate(self) -> bool: | |
| """Authenticate with the Gateway using HTTP Basic Authentication. | |
| Returns: | |
| bool: True if authentication successful, False otherwise | |
| Raises: | |
| GatewayAPIError: If authentication fails | |
| """ | |
| try: | |
| # Set up HTTP Basic Authentication | |
| from requests.auth import HTTPBasicAuth | |
| self.session.auth = HTTPBasicAuth(self.username, self.password) | |
| # Test authentication by making a simple request to the API | |
| test_url = urljoin(self.base_url, '/api/gateway/v1/authenticators/') | |
| response = self.session.get(test_url) | |
| if response.status_code in [200, 401]: # 401 means auth is working but might need permissions | |
| self._authenticated = True | |
| logger.info("Successfully authenticated with Gateway using Basic Auth") | |
| return True | |
| else: | |
| error_msg = f"Authentication test failed with status {response.status_code}" | |
| try: | |
| error_data = response.json() | |
| error_msg += f": {error_data}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_msg += f": {response.text}" | |
| raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None) | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Network error during authentication: {str(e)}") | |
| def _ensure_authenticated(self): | |
| """Ensure the client is authenticated, authenticate if needed.""" | |
| if not self._authenticated: | |
| self.authenticate() | |
| def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None) -> requests.Response: | |
| """Make an authenticated request to the Gateway API. | |
| Args: | |
| method: HTTP method (GET, POST, PUT, DELETE, etc.) | |
| endpoint: API endpoint (without base URL) | |
| data: JSON data to send in request body | |
| params: Query parameters | |
| Returns: | |
| requests.Response: The response object | |
| Raises: | |
| GatewayAPIError: If request fails | |
| """ | |
| self._ensure_authenticated() | |
| url = urljoin(self.base_url, endpoint.lstrip('/')) | |
| try: | |
| response = self.session.request(method=method.upper(), url=url, json=data, params=params) | |
| # Log request details | |
| logger.debug(f"{method.upper()} {url} - Status: {response.status_code}") | |
| return response | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Request failed: {str(e)}") | |
| def create_authenticator(self, authenticator_config: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create a new authenticator in Gateway. | |
| Args: | |
| authenticator_config: Authenticator configuration dictionary | |
| Returns: | |
| dict: Created authenticator data | |
| Raises: | |
| GatewayAPIError: If creation fails | |
| """ | |
| endpoint = '/api/gateway/v1/authenticators/' | |
| try: | |
| response = self._make_request('POST', endpoint, data=authenticator_config) | |
| if response.status_code == 201: | |
| result = response.json() | |
| logger.info(f"Successfully created authenticator: {result.get('name', 'Unknown')}") | |
| return result | |
| else: | |
| error_msg = f"Failed to create authenticator. Status: {response.status_code}" | |
| try: | |
| error_data = response.json() | |
| error_msg += f", Error: {error_data}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_msg += f", Response: {response.text}" | |
| raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None) | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Failed to create authenticator: {str(e)}") | |
| def update_authenticator(self, authenticator_id: int, authenticator_config: Dict[str, Any]) -> Dict[str, Any]: | |
| """Update an existing authenticator in Gateway. | |
| Args: | |
| authenticator_id: ID of the authenticator to update | |
| authenticator_config: Authenticator configuration dictionary | |
| Returns: | |
| dict: Updated authenticator data | |
| Raises: | |
| GatewayAPIError: If update fails | |
| """ | |
| endpoint = f'/api/gateway/v1/authenticators/{authenticator_id}/' | |
| try: | |
| response = self._make_request('PATCH', endpoint, data=authenticator_config) | |
| if response.status_code == 200: | |
| result = response.json() | |
| logger.info(f"Successfully updated authenticator: {result.get('name', 'Unknown')}") | |
| return result | |
| else: | |
| error_msg = f"Failed to update authenticator. Status: {response.status_code}" | |
| try: | |
| error_data = response.json() | |
| error_msg += f", Error: {error_data}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_msg += f", Response: {response.text}" | |
| raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None) | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Failed to update authenticator: {str(e)}") | |
| def create_authenticator_map(self, authenticator_id: int, mapper_config: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create a new authenticator map in Gateway. | |
| Args: | |
| authenticator_id: ID of the authenticator to create map for | |
| mapper_config: Mapper configuration dictionary | |
| Returns: | |
| dict: Created mapper data | |
| Raises: | |
| GatewayAPIError: If creation fails | |
| """ | |
| endpoint = '/api/gateway/v1/authenticator_maps/' | |
| try: | |
| response = self._make_request('POST', endpoint, data=mapper_config) | |
| if response.status_code == 201: | |
| result = response.json() | |
| logger.info(f"Successfully created authenticator map: {result.get('name', 'Unknown')}") | |
| return result | |
| else: | |
| error_msg = f"Failed to create authenticator map. Status: {response.status_code}" | |
| try: | |
| error_data = response.json() | |
| error_msg += f", Error: {error_data}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_msg += f", Response: {response.text}" | |
| raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None) | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Failed to create authenticator map: {str(e)}") | |
| def update_authenticator_map(self, mapper_id: int, mapper_config: Dict[str, Any]) -> Dict[str, Any]: | |
| """Update an existing authenticator map in Gateway. | |
| Args: | |
| mapper_id: ID of the authenticator map to update | |
| mapper_config: Mapper configuration dictionary | |
| Returns: | |
| dict: Updated mapper data | |
| Raises: | |
| GatewayAPIError: If update fails | |
| """ | |
| endpoint = f'/api/gateway/v1/authenticator_maps/{mapper_id}/' | |
| try: | |
| response = self._make_request('PATCH', endpoint, data=mapper_config) | |
| if response.status_code == 200: | |
| result = response.json() | |
| logger.info(f"Successfully updated authenticator map: {result.get('name', 'Unknown')}") | |
| return result | |
| else: | |
| error_msg = f"Failed to update authenticator map. Status: {response.status_code}" | |
| try: | |
| error_data = response.json() | |
| error_msg += f", Error: {error_data}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_msg += f", Response: {response.text}" | |
| raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None) | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Failed to update authenticator map: {str(e)}") | |
| def get_authenticators(self, params: Optional[Dict] = None) -> List[Dict[str, Any]]: | |
| """Get list of authenticators from Gateway. | |
| Args: | |
| params: Optional query parameters | |
| Returns: | |
| list: List of authenticator configurations | |
| Raises: | |
| GatewayAPIError: If request fails | |
| """ | |
| endpoint = '/api/gateway/v1/authenticators/' | |
| try: | |
| response = self._make_request('GET', endpoint, params=params) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # Handle paginated response | |
| if isinstance(result, dict) and 'results' in result: | |
| return result['results'] | |
| return result | |
| else: | |
| error_msg = f"Failed to get authenticators. Status: {response.status_code}" | |
| raise GatewayAPIError(error_msg, response.status_code) | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Failed to get authenticators: {str(e)}") | |
| def get_authenticator_by_slug(self, slug: str) -> Optional[Dict[str, Any]]: | |
| """Get a specific authenticator by slug. | |
| Args: | |
| slug: The authenticator slug to search for | |
| Returns: | |
| dict: The authenticator data if found, None otherwise | |
| Raises: | |
| GatewayAPIError: If request fails | |
| """ | |
| try: | |
| # Use query parameter to filter by slug - more efficient than getting all | |
| authenticators = self.get_authenticators(params={'slug': slug}) | |
| # Return the first match (slugs should be unique) | |
| if authenticators: | |
| return authenticators[0] | |
| return None | |
| except GatewayAPIError as e: | |
| # Re-raise Gateway API errors | |
| raise e | |
| except Exception as e: | |
| raise GatewayAPIError(f"Failed to get authenticator by slug: {str(e)}") | |
| def get_authenticator_maps(self, authenticator_id: int) -> List[Dict[str, Any]]: | |
| """Get list of maps for a specific authenticator. | |
| Args: | |
| authenticator_id: ID of the authenticator | |
| Returns: | |
| list: List of authenticator maps | |
| Raises: | |
| GatewayAPIError: If request fails | |
| """ | |
| endpoint = f'/api/gateway/v1/authenticators/{authenticator_id}/authenticator_maps/' | |
| try: | |
| response = self._make_request('GET', endpoint) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # Handle paginated response | |
| if isinstance(result, dict) and 'results' in result: | |
| return result['results'] | |
| return result | |
| else: | |
| error_msg = f"Failed to get authenticator maps. Status: {response.status_code}" | |
| raise GatewayAPIError(error_msg, response.status_code) | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Failed to get authenticator maps: {str(e)}") | |
| def create_github_authenticator( | |
| self, name: str, client_id: str, client_secret: str, enabled: bool = True, create_objects: bool = False, remove_users: bool = False | |
| ) -> Dict[str, Any]: | |
| """Create a GitHub authenticator with the specified configuration. | |
| Args: | |
| name: Name for the authenticator | |
| client_id: GitHub OAuth App Client ID | |
| client_secret: GitHub OAuth App Client Secret | |
| enabled: Whether authenticator should be enabled | |
| create_objects: Whether to create users/orgs/teams automatically | |
| remove_users: Whether to remove users when they lose access | |
| Returns: | |
| dict: Created authenticator data | |
| """ | |
| config = { | |
| "name": name, | |
| "type": "ansible_base.authentication.authenticator_plugins.github", | |
| "enabled": enabled, | |
| "create_objects": create_objects, | |
| "remove_users": remove_users, | |
| "configuration": {"KEY": client_id, "SECRET": client_secret}, | |
| } | |
| return self.create_authenticator(config) | |
| def update_gateway_setting(self, setting_name: str, setting_value: Any) -> Dict[str, Any]: | |
| """Update a Gateway setting via the settings API. | |
| Args: | |
| setting_name: Name of the setting to update | |
| setting_value: Value to set for the setting | |
| Returns: | |
| dict: Updated settings data | |
| Raises: | |
| GatewayAPIError: If update fails | |
| """ | |
| endpoint = '/api/gateway/v1/settings/all/' | |
| # Create the JSON payload with the setting name and value | |
| payload = {setting_name: setting_value} | |
| try: | |
| response = self._make_request('PUT', endpoint, data=payload) | |
| if response.status_code in [200, 204]: | |
| logger.info(f"Successfully updated Gateway setting: {setting_name}") | |
| # Return the response data if available, otherwise return the payload | |
| if response.content: | |
| try: | |
| return response.json() | |
| except requests.exceptions.JSONDecodeError: | |
| return payload | |
| return payload | |
| else: | |
| error_msg = f"Failed to update Gateway setting. Status: {response.status_code}" | |
| try: | |
| error_data = response.json() | |
| error_msg += f", Error: {error_data}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_msg += f", Response: {response.text}" | |
| raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None) | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Failed to update Gateway setting: {str(e)}") | |
| def get_gateway_setting(self, setting_name: str) -> Any: | |
| """Get a Gateway setting value via the settings API. | |
| Args: | |
| setting_name: Name of the setting to retrieve | |
| Returns: | |
| Any: The value of the setting, or None if not found | |
| Raises: | |
| GatewayAPIError: If request fails | |
| """ | |
| endpoint = '/api/gateway/v1/settings/all/' | |
| try: | |
| response = self._make_request('GET', endpoint) | |
| if response.status_code == 200: | |
| settings_data = response.json() | |
| logger.info("Successfully retrieved Gateway settings") | |
| # Return the specific setting value or None if not found | |
| return settings_data.get(setting_name) | |
| else: | |
| error_msg = f"Failed to get Gateway settings. Status: {response.status_code}" | |
| try: | |
| error_data = response.json() | |
| error_msg += f", Error: {error_data}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_msg += f", Response: {response.text}" | |
| raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None) | |
| except requests.RequestException as e: | |
| raise GatewayAPIError(f"Failed to get Gateway setting: {str(e)}") | |
| def get_base_url(self) -> str: | |
| """Get the base URL of the Gateway instance. | |
| Returns: | |
| str: The base URL of the Gateway instance | |
| """ | |
| return self.base_url | |
| def close(self): | |
| """Close the session and clean up resources.""" | |
| if self.session: | |
| self.session.close() | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit.""" | |
| self.close() | |
| def _write_output(self, message, style=None): | |
| """Write output message if command is available.""" | |
| if self.command: | |
| if style == 'success': | |
| self.command.stdout.write(self.command.style.SUCCESS(message)) | |
| elif style == 'warning': | |
| self.command.stdout.write(self.command.style.WARNING(message)) | |
| elif style == 'error': | |
| self.command.stdout.write(self.command.style.ERROR(message)) | |
| else: | |
| self.command.stdout.write(message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment