Created
June 18, 2021 23:44
-
-
Save ademidun/e2d77ef9033256305a91ca15c5712808 to your computer and use it in GitHub Desktop.
Load data from an external file and save to a database in Django and add some tests to the data loader functionality.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import gspread | |
from django.db.models import BooleanField | |
from google.oauth2.service_account import Credentials | |
from django.contrib.postgres.fields import ArrayField | |
from contact.models import Contact | |
class DataLoader: | |
def __init__(self, input_file: str): | |
""" | |
To see the expected input type for input_file see: helpers/tests/test_data/canada_uni_stem_clubs_instagram.csv | |
:param input_file: | |
""" | |
self.input_data = [] | |
if input_file.startswith("GOOGLE_SHEETS_ID__"): | |
input_file_separated = input_file.split("__") | |
sheets_id = input_file_separated[1] | |
self.input_data = self.get_data_from_google_sheets(sheets_id) | |
elif input_file.endswith(".csv"): | |
# See https://stackoverflow.com/a/21572244/14874841 for reading in a csv file as a list of dicts. | |
self.input_data = self.get_data_from_csv(input_file) | |
if input_file and len(self.input_data) == 0: | |
raise ValueError(f"The input_file: {input_file} does not contain any input data") | |
def save_to_database(self, object_type): | |
self.remove_invalid_fields(object_type) | |
if object_type == "contact": | |
self.save_to_database_as_contact() | |
else: | |
raise NotImplementedError("DataLoader currently only supports object type contact") | |
def remove_invalid_fields(self, object_type): | |
model = None | |
if object_type == "contact": | |
model = Contact | |
model_fields = [f.name for f in model._meta.get_fields()] | |
cleaned_input_data = [] | |
for item in self.input_data: | |
cleaned_item = {} | |
for k, v in item.items(): | |
if k not in model_fields: | |
continue | |
if "https://www.instagram.com" in v: | |
ig_profile_separated = v.split("instagram.com/") | |
v = ig_profile_separated[1].rstrip('/') | |
if isinstance(v, str): | |
if v.lower() == 'true': | |
v = True | |
elif v.lower() == 'false': | |
v = False | |
field_class = model._meta.get_field(k).__class__ | |
# if the field_type is an Array Field or a subclass of ArrayField e.g. ChoiceArrayField, | |
# convert the value to a list | |
# Note that a class can be a subclass of itself: https://www.journaldev.com/22938/python-issubclass | |
if issubclass(field_class, ArrayField): | |
if v == '': | |
v = [] | |
else: | |
v = [v] | |
elif issubclass(field_class, BooleanField): | |
v = bool(v) | |
cleaned_item[k] = v | |
cleaned_input_data.append(cleaned_item) | |
self.input_data = cleaned_input_data | |
def save_to_database_as_contact(self): | |
for contact in self.input_data: | |
Contact.objects.create(**contact) | |
@staticmethod | |
def get_data_from_google_sheets(sheet_key): | |
# use credentials to create a client to interact with the Google Drive API | |
scopes = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive'] | |
credentials = Credentials.from_service_account_file('./inlaid-isotope-316706-4811fc61dcbf.json', scopes=scopes) | |
client = gspread.authorize(credentials) | |
sheet = client.open_by_key(sheet_key).sheet1 | |
return sheet.get_all_records() | |
@staticmethod | |
def get_data_from_csv(file_name): | |
""" | |
Given a CSV file. Return the data inside the file as a list of dictionaries. | |
:param file_name: | |
:return: | |
""" | |
with open(file_name) as f: | |
csv_data = [{k: v for k, v in row.items()} | |
for row in csv.DictReader(f, skipinitialspace=True)] | |
return csv_data | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
source test.sh helpers.tests.test_data_loader.DataLoaderTests | |
source test.sh helpers.tests.test_data_loader.DataLoaderTests.test_create_data_loader_from_google_sheets | |
""" | |
from pathlib import Path | |
from unittest.mock import patch | |
from contact.models import Contact | |
from helpers.constants.general import SCHOOLS | |
from helpers.data_loader import DataLoader | |
from helpers.tests.test_helpers import AtilaTestCase | |
from server.settings import BASE_DIR | |
class DataLoaderTests(AtilaTestCase): | |
def setUp(self) -> None: | |
self.test_data_dir = Path(BASE_DIR) / "helpers/tests/test_data" | |
self.canada_uni_stem_clubs_instagram_input_file = self.test_data_dir / "canada_uni_stem_clubs_instagram.csv" | |
# Question: Should we use the get_data_from_csv() function from DataLoader | |
# if this unit test is also testing DataLoader? | |
# Usually we would say no, because it might cause dependency errors | |
# but we can make an exception for this function | |
# because the get_data_from_csv() function is very simple. | |
self.canada_uni_stem_clubs_instagram_input_data = DataLoader. \ | |
get_data_from_csv(self.canada_uni_stem_clubs_instagram_input_file) | |
self.empty_file_csv = self.test_data_dir / "empty_file.csv" | |
self.empty_file_csv.touch(exist_ok=True) | |
def tearDown(self) -> None: | |
if self.empty_file_csv.exists(): | |
self.empty_file_csv.unlink() | |
def test_save_contact_to_database_from_csv(self): | |
starting_contacts_count = Contact.objects.count() | |
data_loader = DataLoader(str(self.canada_uni_stem_clubs_instagram_input_file)) | |
data_loader.save_to_database("contact") | |
dal_women_in_engineering_contact = Contact.objects.get(instagram_username="dalwie") | |
added_contacts = len(data_loader.input_data) | |
final_contacts_count = Contact.objects.count() | |
self.assertTrue(dal_women_in_engineering_contact.female_only) | |
self.assertEqual(dal_women_in_engineering_contact.eligible_schools[0], "Dalhousie University") | |
self.assertIn(dal_women_in_engineering_contact.eligible_schools[0], SCHOOLS) | |
self.assertEqual(starting_contacts_count + added_contacts, final_contacts_count) | |
# note how the order of the decorators is in the opposite order of the arguments. | |
# for example: google.oauth2 is the first decorator but the last argument. | |
@patch('google.oauth2.service_account.Credentials.from_service_account_file') | |
@patch('gspread.authorize') | |
def test_create_data_loader_from_google_sheets(self, gspread_mock, credentials_mock): | |
gspread_mock().open_by_key().sheet1.get_all_records.return_value =\ | |
self.canada_uni_stem_clubs_instagram_input_data | |
google_sheets_id = "ABC123" | |
google_sheets_id_argument = f"GOOGLE_SHEETS_ID__{google_sheets_id}" | |
data_loader = DataLoader(google_sheets_id_argument) | |
self.assertTrue(credentials_mock.call_count, 1) | |
self.assertTrue(gspread_mock.call_count, 1) | |
self.assertGreater(len(data_loader.input_data), 0) | |
self.assertIsInstance(data_loader.input_data, list) | |
gspread_mock().open_by_key.assert_called_with(google_sheets_id) | |
self.assertListEqual(data_loader.input_data, self.canada_uni_stem_clubs_instagram_input_data) | |
def test_load_data_empty_csv_file_raises_error(self): | |
with self.assertRaisesRegexp(ValueError, f"{str(self.empty_file_csv)} does not contain any input data"): | |
DataLoader(str(self.empty_file_csv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment