Skip to content

Instantly share code, notes, and snippets.

@dvu4
Last active November 16, 2022 15:48
Show Gist options
  • Save dvu4/164b8ad6608187868c345288aa390eea to your computer and use it in GitHub Desktop.
Save dvu4/164b8ad6608187868c345288aa390eea to your computer and use it in GitHub Desktop.
this script will send event message to servicebus queue when service principals are expiry in `expires_within` time and the queue receiver will get the message
import uuid
from datetime import datetime
from typing import Optional
import typer
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from my_api.service_principals import filter_my_service_principals, create_secret
from my_api.auth import get_token
from my_api.config import Settings
from my_api.models import AzureOwnedSPResponse, SetSecretResponse
NAMESPACE_NAME = "namespace_name"
SHARED_ACCESS_KEY = "shared_access_key"
CONNECTION_STRING = f"Endpoint=sb://{NAMESPACE_NAME}.servicebus.windows.net/;SharedAccessKeyName" \
"=RootManageSharedAccessKey;SharedAccessKey={SHARED_ACCESS_KEY}"
QUEUE_NAME = "queue_name"
TOPIC_NAME = "event_name"
SUBSCRIPTION_NAME = "subscription_name"
RULE_NAME = "rule_name"
def create_servicebus_sender(
connection_string: Optional[str],
queue_name: Optional[str],
topic_name: Optional[str],
servicebus_type: Optional[str] = "queue"):
"""
Create servicebus sender (producer/publisher)
"""
servicebus_client = ServiceBusClient.from_connection_string(conn_str=connection_string)
if servicebus_type == "queue":
servicebus_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
elif servicebus_type == "topic":
servicebus_sender = servicebus_client.get_topic_sender(topic_name=topic_name)
return servicebus_sender
def create_servicebus_receiver(
connection_string: Optional[str],
queue_name: Optional[str],
topic_name: Optional[str],
subscription_name: Optional[str],
servicebus_type: Optional[str] = "queue"):
"""
Create servicebus receiver (consumer/subscriber)
"""
servicebus_client = ServiceBusClient.from_connection_string(conn_str=connection_string)
if servicebus_type == "queue":
servicebus_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
elif servicebus_type == "topic":
servicebus_receiver = servicebus_client.get_subscription_receiver(
topic_name=topic_name,
subscription_name=subscription_name)
return servicebus_receiver
def main(expires_within: Optional[int] = 60) -> None:
connection_string = get_connection_string()
config = Settings()
access_token = get_token(config)
response: SetSecretResponse = create_secret(
access_token,
keyvault_name,
display_name)
response: AzureOwnedSPResponse = filter_my_service_principals(
access_token,
config.client_id,
expires_within=expires_within)
correlation_id = str(uuid.uuid4())
session_id = datetime.now().strftime('%Y%m%d%H%M%s')[:14]
messages = []
for sp in response.ownedSPDetails:
display_name = sp.spDisplayName
key_id = sp.credentialDetails[0].keyId
expiry_date = sp.credentialDetails[0].expiryDate
message = ServiceBusMessage(
str({"display_name": display_name,
"key_id": key_id,
"expiry_date": expiry_date,
"keyvault_name": keyvault_name}),
subject="RotateSecretEventType",
content_type="application/json",
session_id=session_id,
partition_key=session_id,
correlation_id=correlation_id)
messages.append(message)
# SERVICEBUS QUEUE
# create servicebus queue sender
servicebus_sender = create_servicebus_sender(
connection_string=CONNECTION_STRING,
queue_name=QUEUE_NAME,
topic_name=TOPIC_NAME,
servicebus_type="queue")
# send a message to queue
with servicebus_sender:
servicebus_sender.send_messages(messages)
# create servicebus queue receiver
servicebus_receiver = create_servicebus_receiver(
connection_string=CONNECTION_STRING,
queue_name=QUEUE_NAME,
topic_name=TOPIC_NAME,
subscription_name=SUBSCRIPTION_NAME,
servicebus_type="queue")
# receive message from publisher
with servicebus_receiver:
received_messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in received_messages:
servicebus_receiver.complete_message(message)
if __name__ == '__main__':
typer.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment