Skip to content

Instantly share code, notes, and snippets.

@prabdeb
Last active July 10, 2020 11:07
Show Gist options
  • Save prabdeb/1054d92d772a2fa96cb7d4aadcadbbd3 to your computer and use it in GitHub Desktop.
Save prabdeb/1054d92d772a2fa96cb7d4aadcadbbd3 to your computer and use it in GitHub Desktop.
Unit Testing with Azure Event Hub Receive Event
class MyEventHub:
'''Class to interact with Event Hubs'''
def __init__(self, checkpoint_storage_connection_str, checkpoint_storage_container_name,
eh_name, eh_connection_str, eh_consumer_group):
self.logger = logging.getLogger('my.eventhub')
try:
checkpoint_store = BlobCheckpointStore.from_connection_string(
checkpoint_storage_connection_str, checkpoint_storage_container_name)
consumer_client = EventHubConsumerClient.from_connection_string(
eh_connection_str,
eh_consumer_group,
eventhub_name=eh_name,
checkpoint_store=checkpoint_store,
)
except Exception as ex:
self.logger.error('error while initiating eventhub - %s, - %s', eh_name, ex)
self.consumer_client = consumer_client
def receive_events(self, receive_duration=15):
'''Receive events from EH'''
received_events = []
# Some logic of filtering Event Hub
def on_event(partition_context, event):
# Some logic to accept a valid event
# or droping the event if invalid
received_events.append({"message": event.body_as_str(encoding='UTF-8'),
"properties": event.properties})
partition_context.update_checkpoint(event)
self.logger.debug('consumer will keep receiving for %d seconds, start time is %f.', receive_duration, time.time())
try:
thread = threading.Thread(
target=self.consumer_client.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1",
}
)
thread.start()
time.sleep(receive_duration)
self.consumer_client.close()
thread.join()
except Exception as ex:
self.logger.warning('error while receiving events for - %s, - %s', self.eh_name, ex)
finally:
self.logger.debug('consumer has stopped receiving, end time is %f.', time.time())
self.logger.debug('consumer has stopped receiving, end time is %f and total events are - %d', time.time(), len(received_events))
return received_events
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment