Skip to content

Instantly share code, notes, and snippets.

@rjriel
Last active June 3, 2022 00:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjriel/2ba676b75b9f7c5660de44500c94baf1 to your computer and use it in GitHub Desktop.
Save rjriel/2ba676b75b9f7c5660de44500c94baf1 to your computer and use it in GitHub Desktop.
Replay to a Temporary Queue
# Create a python3 virtual environment
python3 -m venv venv
# Activate the virtual env
source venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# the sample code is for an environment where you are running a broker
# in a local environment. If you need to, uncomment the lines below and
# replace the < > values with your true values
# export SOLACE_HOST=<host>
# export SOLACE_VPN=<message vpn>
# export SOLACE_USERNAME=<client username>
# export SOLACE_PASSWORD=<client password>
# Run samples
python temporary_subscriber.py
autopep8==1.6.0
certifi==2020.12.5
chardet==4.0.0
idna==2.10
pycodestyle==2.8.0
requests==2.25.1
solace-pubsubplus==1.3.0
toml==0.10.2
urllib3==1.26.4
# Consumer that binds to exclusive durable queue
# Assumes existence of queue on broker holding messages.
# Note: create queue with topic subscription
# See https://docs.solace.com/Solace-PubSub-Messaging-APIs/API-Developer-Guide/Adding-Topic-Subscriptio.htm for more details
from datetime import datetime
import os
import platform
import time
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent
from solace.messaging.resources.queue import Queue
from solace.messaging.config.retry_strategy import RetryStrategy
from solace.messaging.config.replay_strategy import ReplayStrategy
from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy
from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver
from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage
from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError
from solace.messaging.resources.topic_subscription import TopicSubscription
if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer
# Handle received messages
class MessageHandlerImpl(MessageHandler):
def on_message(self, message: InboundMessage):
# Check if the payload is a String or Byte, decode if its the later
payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes()
if isinstance(payload, bytearray):
print(f"Received a message of type: {type(payload)}. Decoding to string")
payload = payload.decode()
topic = message.get_destination_name()
print("\n" + f"Received message on: {topic}")
print("\n" + f"Message payload: {payload} \n")
# print("\n" + f"Message dump: {message} \n")
# Inner classes for error handling
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener):
def on_reconnected(self, e: ServiceEvent):
print("\non_reconnected")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
def on_reconnecting(self, e: "ServiceEvent"):
print("\non_reconnecting")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
def on_service_interrupted(self, e: "ServiceEvent"):
print("\non_service_interrupted")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
# Broker Config. Note: Could pass other properties Look into
broker_props = {
"solace.messaging.transport.host": os.environ.get('SOLACE_HOST') or "tcp://localhost:55555,tcp://localhost:55554",
"solace.messaging.service.vpn-name": os.environ.get('SOLACE_VPN') or "default",
"solace.messaging.authentication.scheme.basic.username": os.environ.get('SOLACE_USERNAME') or "default",
"solace.messaging.authentication.scheme.basic.password": os.environ.get('SOLACE_PASSWORD') or "default"
}
# Build A messaging service with a reconnection strategy of 20 retries over an interval of 3 seconds
# Note: The reconnections strategy could also be configured using the broker properties object
messaging_service = MessagingService.builder().from_properties(broker_props)\
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3000))\
.build()
# Blocking connect thread
messaging_service.connect()
print(f'Messaging Service connected? {messaging_service.is_connected}')
# Event Handling for the messaging service
service_handler = ServiceEventHandler()
messaging_service.add_reconnection_listener(service_handler)
messaging_service.add_reconnection_attempt_listener(service_handler)
messaging_service.add_service_interruption_listener(service_handler)
# Queue name.
# NOTE: This assumes that a queue template exists with the name filter matching temp-training/>
queue_name = "temp-training/browse-game-scans"
durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name)
try:
# The topic we are subscribing to is to get all games scanned in all stores in the APAC region
topic_sub = [TopicSubscription.of("gameco/retail/apac/*/game/*/scanned")]
reply_strategy = ReplayStrategy.all_messages()
# Build a receiver and bind it to the temporary queue
persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
.with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)\
.with_subscriptions(topic_sub)\
.with_message_replay(reply_strategy)\
.with_message_auto_acknowledgement()\
.build(durable_exclusive_queue)
persistent_receiver.start()
# Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl())
print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print('\nKeyboardInterrupt received')
# Handle API exception
except PubSubPlusClientError as exception:
print(f'\nMake sure queue {queue_name} exists on broker!')
finally:
if persistent_receiver and persistent_receiver.is_running():
print('\nTerminating receiver')
persistent_receiver.terminate(grace_period = 0)
print('\nDisconnecting Messaging Service')
messaging_service.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment