Last active
November 16, 2022 15:49
-
-
Save dvu4/c3ef35a0d20c95e9391df93f3ce7d081 to your computer and use it in GitHub Desktop.
this script will send event message to servicebus topic when service principals are expiry in `expires_within` time and the servicebus subscription will get the message
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
from datetime import datetime | |
from typing import Optional | |
import typer | |
from azure.servicebus import ServiceBusClient, ServiceBusMessage | |
from my_api.api.service_principals import filter_my_service_principals, create_secret | |
from my_api.auth import get_token | |
from my_api.config import Settings | |
from my_api.models import AzureOwnedSPResponse, SetSecretResponse | |
NAMESPACE_NAME = "namespace_name" | |
SHARED_ACCESS_KEY = "shared_access_key" | |
CONNECTION_STRING = f"Endpoint=sb://{NAMESPACE_NAME}.servicebus.windows.net/;SharedAccessKeyName" \ | |
"=RootManageSharedAccessKey;SharedAccessKey={SHARED_ACCESS_KEY}" | |
QUEUE_NAME = "queue_name" | |
TOPIC_NAME = "event_name" | |
SUBSCRIPTION_NAME = "subscription_name" | |
RULE_NAME = "rule_name" | |
def create_servicebus_sender( | |
connection_string: Optional[str], | |
queue_name: Optional[str], | |
topic_name: Optional[str], | |
servicebus_type: Optional[str] = "queue"): | |
""" | |
Create servicebus sender (producer/publisher) | |
""" | |
servicebus_client = ServiceBusClient.from_connection_string(conn_str=connection_string) | |
if servicebus_type == "queue": | |
servicebus_sender = servicebus_client.get_queue_sender(queue_name=queue_name) | |
elif servicebus_type == "topic": | |
servicebus_sender = servicebus_client.get_topic_sender(topic_name=topic_name) | |
return servicebus_sender | |
def create_servicebus_receiver( | |
connection_string: Optional[str], | |
queue_name: Optional[str], | |
topic_name: Optional[str], | |
subscription_name: Optional[str], | |
servicebus_type: Optional[str] = "queue"): | |
""" | |
Create servicebus receiver (consumer/subscriber) | |
""" | |
servicebus_client = ServiceBusClient.from_connection_string(conn_str=connection_string) | |
if servicebus_type == "queue": | |
servicebus_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name) | |
elif servicebus_type == "topic": | |
servicebus_receiver = servicebus_client.get_subscription_receiver( | |
topic_name=topic_name, | |
subscription_name=subscription_name) | |
return servicebus_receiver | |
def main(expires_within: Optional[int] = 60) -> None: | |
connection_string = get_connection_string() | |
config = Settings() | |
access_token = get_token(config) | |
response: SetSecretResponse = create_secret( | |
access_token, | |
keyvault_name, | |
display_name) | |
response: AzureOwnedSPResponse = filter_my_service_principals( | |
access_token, | |
config.client_id, | |
expires_within=expires_within) | |
correlation_id = str(uuid.uuid4()) | |
session_id = datetime.now().strftime('%Y%m%d%H%M%s')[:14] | |
messages = [] | |
for sp in response.ownedSPDetails: | |
display_name = sp.spDisplayName | |
key_id = sp.credentialDetails[0].keyId | |
expiry_date = sp.credentialDetails[0].expiryDate | |
message = ServiceBusMessage( | |
str({"display_name": display_name, | |
"key_id": key_id, | |
"expiry_date": expiry_date, | |
"keyvault_name": keyvault_name}), | |
subject="RotateSecretEventType", | |
content_type="application/json", | |
session_id=session_id, | |
partition_key=session_id, | |
correlation_id=correlation_id) | |
messages.append(message) | |
# SERVICEBUS TOPIC/SUBSCRIPTION | |
# create servicebus topic sender | |
servicebus_sender = create_servicebus_sender( | |
connection_string=CONNECTION_STRING, | |
queue_name=QUEUE_NAME, | |
topic_name=TOPIC_NAME, | |
servicebus_type="topic") | |
# send a message to topic | |
with servicebus_sender: | |
servicebus_sender.send_messages(messages) | |
# create servicebus subscription receiver | |
servicebus_receiver = create_servicebus_receiver( | |
connection_string=CONNECTION_STRING, | |
queue_name=QUEUE_NAME, | |
topic_name=TOPIC_NAME, | |
subscription_name=SUBSCRIPTION_NAME, | |
servicebus_type="topic") | |
# receive message in subscriber | |
with servicebus_receiver: | |
received_messages = servicebus_receiver.receive_messages(max_wait_time=5) | |
for message in received_messages: | |
servicebus_receiver.complete_message(message) | |
if __name__ == '__main__': | |
typer.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment