Skip to content

Instantly share code, notes, and snippets.

@prabdeb
Last active July 21, 2020 12:12
Show Gist options
  • Save prabdeb/be47a770c80c38284a6c00afdcac3c9d to your computer and use it in GitHub Desktop.
Save prabdeb/be47a770c80c38284a6c00afdcac3c9d to your computer and use it in GitHub Desktop.
Unit Testing with Azure Event Hub Receive Event
from azure.eventhub import EventDat
class TestMethods(unittest.TestCase):
@patch('azure.eventhub.EventHubConsumerClient.from_connection_string')
@patch('azure.eventhub.extensions.checkpointstoreblob.BlobCheckpointStore.from_connection_string')
def test_eventhub_receive_events(self,
mock_blob_checkpoint_store,
mock_event_hub_consumer_client):
'''Test Case 1: Receive events using MyEventHub'''
def receive(on_event, **kwargs):
partition_context = MagicMock()
partition_context.update_checkpoint = MagicMock()
on_event(partition_context, EventData("Mock"))
mock_event_hub_consumer_client.receive = receive
eh_obj = MyEventHub("foo", "bar", "hi", "there", "blah")
received_events = eh_obj.receive_events(receive_duration=1)
self.assertEqual([], received_events)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment