Last active
July 10, 2020 11:07
-
-
Save prabdeb/1054d92d772a2fa96cb7d4aadcadbbd3 to your computer and use it in GitHub Desktop.
Unit Testing with Azure Event Hub Receive Event
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyEventHub: | |
'''Class to interact with Event Hubs''' | |
def __init__(self, checkpoint_storage_connection_str, checkpoint_storage_container_name, | |
eh_name, eh_connection_str, eh_consumer_group): | |
self.logger = logging.getLogger('my.eventhub') | |
try: | |
checkpoint_store = BlobCheckpointStore.from_connection_string( | |
checkpoint_storage_connection_str, checkpoint_storage_container_name) | |
consumer_client = EventHubConsumerClient.from_connection_string( | |
eh_connection_str, | |
eh_consumer_group, | |
eventhub_name=eh_name, | |
checkpoint_store=checkpoint_store, | |
) | |
except Exception as ex: | |
self.logger.error('error while initiating eventhub - %s, - %s', eh_name, ex) | |
self.consumer_client = consumer_client | |
def receive_events(self, receive_duration=15): | |
'''Receive events from EH''' | |
received_events = [] | |
# Some logic of filtering Event Hub | |
def on_event(partition_context, event): | |
# Some logic to accept a valid event | |
# or droping the event if invalid | |
received_events.append({"message": event.body_as_str(encoding='UTF-8'), | |
"properties": event.properties}) | |
partition_context.update_checkpoint(event) | |
self.logger.debug('consumer will keep receiving for %d seconds, start time is %f.', receive_duration, time.time()) | |
try: | |
thread = threading.Thread( | |
target=self.consumer_client.receive, | |
kwargs={ | |
"on_event": on_event, | |
"starting_position": "-1", | |
} | |
) | |
thread.start() | |
time.sleep(receive_duration) | |
self.consumer_client.close() | |
thread.join() | |
except Exception as ex: | |
self.logger.warning('error while receiving events for - %s, - %s', self.eh_name, ex) | |
finally: | |
self.logger.debug('consumer has stopped receiving, end time is %f.', time.time()) | |
self.logger.debug('consumer has stopped receiving, end time is %f and total events are - %d', time.time(), len(received_events)) | |
return received_events |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment