Last active
November 16, 2022 15:48
-
-
Save dvu4/164b8ad6608187868c345288aa390eea to your computer and use it in GitHub Desktop.
this script will send event message to servicebus queue when service principals are expiry in `expires_within` time and the queue receiver will get the message
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
from datetime import datetime | |
from typing import Optional | |
import typer | |
from azure.servicebus import ServiceBusClient, ServiceBusMessage | |
from my_api.service_principals import filter_my_service_principals, create_secret | |
from my_api.auth import get_token | |
from my_api.config import Settings | |
from my_api.models import AzureOwnedSPResponse, SetSecretResponse | |
NAMESPACE_NAME = "namespace_name" | |
SHARED_ACCESS_KEY = "shared_access_key" | |
CONNECTION_STRING = f"Endpoint=sb://{NAMESPACE_NAME}.servicebus.windows.net/;SharedAccessKeyName" \ | |
"=RootManageSharedAccessKey;SharedAccessKey={SHARED_ACCESS_KEY}" | |
QUEUE_NAME = "queue_name" | |
TOPIC_NAME = "event_name" | |
SUBSCRIPTION_NAME = "subscription_name" | |
RULE_NAME = "rule_name" | |
def create_servicebus_sender( | |
connection_string: Optional[str], | |
queue_name: Optional[str], | |
topic_name: Optional[str], | |
servicebus_type: Optional[str] = "queue"): | |
""" | |
Create servicebus sender (producer/publisher) | |
""" | |
servicebus_client = ServiceBusClient.from_connection_string(conn_str=connection_string) | |
if servicebus_type == "queue": | |
servicebus_sender = servicebus_client.get_queue_sender(queue_name=queue_name) | |
elif servicebus_type == "topic": | |
servicebus_sender = servicebus_client.get_topic_sender(topic_name=topic_name) | |
return servicebus_sender | |
def create_servicebus_receiver( | |
connection_string: Optional[str], | |
queue_name: Optional[str], | |
topic_name: Optional[str], | |
subscription_name: Optional[str], | |
servicebus_type: Optional[str] = "queue"): | |
""" | |
Create servicebus receiver (consumer/subscriber) | |
""" | |
servicebus_client = ServiceBusClient.from_connection_string(conn_str=connection_string) | |
if servicebus_type == "queue": | |
servicebus_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name) | |
elif servicebus_type == "topic": | |
servicebus_receiver = servicebus_client.get_subscription_receiver( | |
topic_name=topic_name, | |
subscription_name=subscription_name) | |
return servicebus_receiver | |
def main(expires_within: Optional[int] = 60) -> None: | |
connection_string = get_connection_string() | |
config = Settings() | |
access_token = get_token(config) | |
response: SetSecretResponse = create_secret( | |
access_token, | |
keyvault_name, | |
display_name) | |
response: AzureOwnedSPResponse = filter_my_service_principals( | |
access_token, | |
config.client_id, | |
expires_within=expires_within) | |
correlation_id = str(uuid.uuid4()) | |
session_id = datetime.now().strftime('%Y%m%d%H%M%s')[:14] | |
messages = [] | |
for sp in response.ownedSPDetails: | |
display_name = sp.spDisplayName | |
key_id = sp.credentialDetails[0].keyId | |
expiry_date = sp.credentialDetails[0].expiryDate | |
message = ServiceBusMessage( | |
str({"display_name": display_name, | |
"key_id": key_id, | |
"expiry_date": expiry_date, | |
"keyvault_name": keyvault_name}), | |
subject="RotateSecretEventType", | |
content_type="application/json", | |
session_id=session_id, | |
partition_key=session_id, | |
correlation_id=correlation_id) | |
messages.append(message) | |
# SERVICEBUS QUEUE | |
# create servicebus queue sender | |
servicebus_sender = create_servicebus_sender( | |
connection_string=CONNECTION_STRING, | |
queue_name=QUEUE_NAME, | |
topic_name=TOPIC_NAME, | |
servicebus_type="queue") | |
# send a message to queue | |
with servicebus_sender: | |
servicebus_sender.send_messages(messages) | |
# create servicebus queue receiver | |
servicebus_receiver = create_servicebus_receiver( | |
connection_string=CONNECTION_STRING, | |
queue_name=QUEUE_NAME, | |
topic_name=TOPIC_NAME, | |
subscription_name=SUBSCRIPTION_NAME, | |
servicebus_type="queue") | |
# receive message from publisher | |
with servicebus_receiver: | |
received_messages = servicebus_receiver.receive_messages(max_wait_time=5) | |
for message in received_messages: | |
servicebus_receiver.complete_message(message) | |
if __name__ == '__main__': | |
typer.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment