Skip to content

Instantly share code, notes, and snippets.

View prabdeb's full-sized avatar
:atom:
Concentrating

Prabal Deb prabdeb

:atom:
Concentrating
View GitHub Profile
@prabdeb
prabdeb / azure_analysis_test.py
Last active June 16, 2020 08:49
Unit Testing with Azure SQL Database
class TestMethods(unittest.TestCase):
@patch('pyodbc.connect')
def test_myanalysis_analysis(self,
mock_pyodbc_connect):
'''Test Case 1: Test analysis in MyAnalysis'''
mock_cursor = mock_pyodbc_connect.return_value.cursor.return_value
mock_cursor.description = [
("col_a", "1"), ("col_b", "2"), ("col_c", "3"), ("col_d", "4"), ("col_e", "4")]
mock_cursor.fetchall.return_value = [
("a", "b", "c", "d", "e")]
@prabdeb
prabdeb / azure_analysis.py
Last active June 16, 2020 08:49
Unit Testing with Azure SQL Database
class MyAnalysis:
'''Class for doing some analysis on the items'''
def __init__(self, db_server, db_database, db_username, db_password):
self.logger = logging.getLogger('my.operation')
self.db_server = db_server
self.db_database = db_database
self.db_username = db_username
self.db_password = db_password
def analysis(self):
@prabdeb
prabdeb / receive_event_test.py
Last active July 21, 2020 12:12
Unit Testing with Azure Event Hub Receive Event
from azure.eventhub import EventDat
class TestMethods(unittest.TestCase):
@patch('azure.eventhub.EventHubConsumerClient.from_connection_string')
@patch('azure.eventhub.extensions.checkpointstoreblob.BlobCheckpointStore.from_connection_string')
def test_eventhub_receive_events(self,
mock_blob_checkpoint_store,
mock_event_hub_consumer_client):
'''Test Case 1: Receive events using MyEventHub'''
def receive(on_event, **kwargs):
@prabdeb
prabdeb / receive_event.py
Last active July 10, 2020 11:07
Unit Testing with Azure Event Hub Receive Event
class MyEventHub:
'''Class to interact with Event Hubs'''
def __init__(self, checkpoint_storage_connection_str, checkpoint_storage_container_name,
eh_name, eh_connection_str, eh_consumer_group):
self.logger = logging.getLogger('my.eventhub')
try:
checkpoint_store = BlobCheckpointStore.from_connection_string(
checkpoint_storage_connection_str, checkpoint_storage_container_name)
consumer_client = EventHubConsumerClient.from_connection_string(
eh_connection_str,
### Keybase proof
I hereby claim:
* I am prabdeb on github.
* I am prabdeb (https://keybase.io/prabdeb) on keybase.
* I have a public key ASDGge4p_qwEvVQ9ct6ipIBzN_H7IkO5AX34mfGs3lT-iAo
To claim this, I am signing this object: