Skip to content

Instantly share code, notes, and snippets.

@ademidun
Created June 18, 2021 23:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ademidun/e2d77ef9033256305a91ca15c5712808 to your computer and use it in GitHub Desktop.
Save ademidun/e2d77ef9033256305a91ca15c5712808 to your computer and use it in GitHub Desktop.
Load data from an external file and save to a database in Django and add some tests to the data loader functionality.
import csv
import gspread
from django.db.models import BooleanField
from google.oauth2.service_account import Credentials
from django.contrib.postgres.fields import ArrayField
from contact.models import Contact
class DataLoader:
def __init__(self, input_file: str):
"""
To see the expected input type for input_file see: helpers/tests/test_data/canada_uni_stem_clubs_instagram.csv
:param input_file:
"""
self.input_data = []
if input_file.startswith("GOOGLE_SHEETS_ID__"):
input_file_separated = input_file.split("__")
sheets_id = input_file_separated[1]
self.input_data = self.get_data_from_google_sheets(sheets_id)
elif input_file.endswith(".csv"):
# See https://stackoverflow.com/a/21572244/14874841 for reading in a csv file as a list of dicts.
self.input_data = self.get_data_from_csv(input_file)
if input_file and len(self.input_data) == 0:
raise ValueError(f"The input_file: {input_file} does not contain any input data")
def save_to_database(self, object_type):
self.remove_invalid_fields(object_type)
if object_type == "contact":
self.save_to_database_as_contact()
else:
raise NotImplementedError("DataLoader currently only supports object type contact")
def remove_invalid_fields(self, object_type):
model = None
if object_type == "contact":
model = Contact
model_fields = [f.name for f in model._meta.get_fields()]
cleaned_input_data = []
for item in self.input_data:
cleaned_item = {}
for k, v in item.items():
if k not in model_fields:
continue
if "https://www.instagram.com" in v:
ig_profile_separated = v.split("instagram.com/")
v = ig_profile_separated[1].rstrip('/')
if isinstance(v, str):
if v.lower() == 'true':
v = True
elif v.lower() == 'false':
v = False
field_class = model._meta.get_field(k).__class__
# if the field_type is an Array Field or a subclass of ArrayField e.g. ChoiceArrayField,
# convert the value to a list
# Note that a class can be a subclass of itself: https://www.journaldev.com/22938/python-issubclass
if issubclass(field_class, ArrayField):
if v == '':
v = []
else:
v = [v]
elif issubclass(field_class, BooleanField):
v = bool(v)
cleaned_item[k] = v
cleaned_input_data.append(cleaned_item)
self.input_data = cleaned_input_data
def save_to_database_as_contact(self):
for contact in self.input_data:
Contact.objects.create(**contact)
@staticmethod
def get_data_from_google_sheets(sheet_key):
# use credentials to create a client to interact with the Google Drive API
scopes = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']
credentials = Credentials.from_service_account_file('./inlaid-isotope-316706-4811fc61dcbf.json', scopes=scopes)
client = gspread.authorize(credentials)
sheet = client.open_by_key(sheet_key).sheet1
return sheet.get_all_records()
@staticmethod
def get_data_from_csv(file_name):
"""
Given a CSV file. Return the data inside the file as a list of dictionaries.
:param file_name:
:return:
"""
with open(file_name) as f:
csv_data = [{k: v for k, v in row.items()}
for row in csv.DictReader(f, skipinitialspace=True)]
return csv_data
"""
source test.sh helpers.tests.test_data_loader.DataLoaderTests
source test.sh helpers.tests.test_data_loader.DataLoaderTests.test_create_data_loader_from_google_sheets
"""
from pathlib import Path
from unittest.mock import patch
from contact.models import Contact
from helpers.constants.general import SCHOOLS
from helpers.data_loader import DataLoader
from helpers.tests.test_helpers import AtilaTestCase
from server.settings import BASE_DIR
class DataLoaderTests(AtilaTestCase):
def setUp(self) -> None:
self.test_data_dir = Path(BASE_DIR) / "helpers/tests/test_data"
self.canada_uni_stem_clubs_instagram_input_file = self.test_data_dir / "canada_uni_stem_clubs_instagram.csv"
# Question: Should we use the get_data_from_csv() function from DataLoader
# if this unit test is also testing DataLoader?
# Usually we would say no, because it might cause dependency errors
# but we can make an exception for this function
# because the get_data_from_csv() function is very simple.
self.canada_uni_stem_clubs_instagram_input_data = DataLoader. \
get_data_from_csv(self.canada_uni_stem_clubs_instagram_input_file)
self.empty_file_csv = self.test_data_dir / "empty_file.csv"
self.empty_file_csv.touch(exist_ok=True)
def tearDown(self) -> None:
if self.empty_file_csv.exists():
self.empty_file_csv.unlink()
def test_save_contact_to_database_from_csv(self):
starting_contacts_count = Contact.objects.count()
data_loader = DataLoader(str(self.canada_uni_stem_clubs_instagram_input_file))
data_loader.save_to_database("contact")
dal_women_in_engineering_contact = Contact.objects.get(instagram_username="dalwie")
added_contacts = len(data_loader.input_data)
final_contacts_count = Contact.objects.count()
self.assertTrue(dal_women_in_engineering_contact.female_only)
self.assertEqual(dal_women_in_engineering_contact.eligible_schools[0], "Dalhousie University")
self.assertIn(dal_women_in_engineering_contact.eligible_schools[0], SCHOOLS)
self.assertEqual(starting_contacts_count + added_contacts, final_contacts_count)
# note how the order of the decorators is in the opposite order of the arguments.
# for example: google.oauth2 is the first decorator but the last argument.
@patch('google.oauth2.service_account.Credentials.from_service_account_file')
@patch('gspread.authorize')
def test_create_data_loader_from_google_sheets(self, gspread_mock, credentials_mock):
gspread_mock().open_by_key().sheet1.get_all_records.return_value =\
self.canada_uni_stem_clubs_instagram_input_data
google_sheets_id = "ABC123"
google_sheets_id_argument = f"GOOGLE_SHEETS_ID__{google_sheets_id}"
data_loader = DataLoader(google_sheets_id_argument)
self.assertTrue(credentials_mock.call_count, 1)
self.assertTrue(gspread_mock.call_count, 1)
self.assertGreater(len(data_loader.input_data), 0)
self.assertIsInstance(data_loader.input_data, list)
gspread_mock().open_by_key.assert_called_with(google_sheets_id)
self.assertListEqual(data_loader.input_data, self.canada_uni_stem_clubs_instagram_input_data)
def test_load_data_empty_csv_file_raises_error(self):
with self.assertRaisesRegexp(ValueError, f"{str(self.empty_file_csv)} does not contain any input data"):
DataLoader(str(self.empty_file_csv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment