Skip to content

Instantly share code, notes, and snippets.

@gm3dmo
Last active November 1, 2019 18:40
Show Gist options
  • Save gm3dmo/cf078698d006bc9a995fd10dfe95051d to your computer and use it in GitHub Desktop.
Save gm3dmo/cf078698d006bc9a995fd10dfe95051d to your computer and use it in GitHub Desktop.
Anonymizing data in CSV Files
#!/usr/bin/env python
"""
"""
__author__ = "David Morris"
__version__ = "0.1.0"
__license__ = "MIT"
import os
import sys
import csv
import logzero
import logging
import pprint
import datetime
import uuid
import dmninolib as nm
import glob
from logzero import logger
from faker import Faker
import sdp
def replaceNamesCached(variant, row, cache, ln='Trout', type='surname'):
"""Cache the names so that structure is preserved across csv's but not accross different runs"""
if variant in row.keys():
if row[variant] == "":
logger.warn(f"""{variant} is blank that could be tasty""")
pass
else:
oldname = row[variant]
if oldname in cache.keys():
logger.debug(f"""cache hit on {variant} (<redacted>) for (<redacted>)""")
row[variant] = cache[oldname]
else:
fake = Faker()
ln = fake.last_name()
fn = fake.first_name()
newname = ln.upper()
row[variant] = newname
cache[oldname] = newname
logger.debug(f"""swapping {variant} (<redacted>) for {newname}""")
return (row, cache)
def replaceNINOCached(variant, row, cache, type='9digit'):
if variant in row.keys():
if row[variant] in cache:
nino = cache[row[variant]]
nino_suffix_text = nino[-1:]
logger.debug(f'seen before and swapping: (<redacted>) switching to: {nino[:8]}')
#row[c] = nino[:8]
row[variant] = nino
if 'NINO_SUFFIX_TEXT' in row:
row['NINO_SUFFIX_TEXT'] = nino_suffix_text
elif 'NINOSUFFIX_TEXT' in row:
row['NINOSUFFIX_TEXT'] = nino_suffix_text
else:
newnino = nm.doMeANino()
newnino_suffix_text = newnino[-1:]
cache[row[variant]] = newnino
logger.debug(f'swapping: (<redacted>) to {newnino}')
#row[c] = newnino[:8]
row[variant] = newnino
if 'NINO_SUFFIX_TEXT' in row:
row['NINO_SUFFIX_TEXT'] = newnino_suffix_text
elif 'NINOSUFFIX_TEXT' in row:
row['NINOSUFFIX_TEXT'] = nino_suffix_text
return (row, cache)
def replaceNames(variant, row, ln='Trout', type='surname'):
fake = Faker()
ln = fake.last_name()
fn = fake.first_name()
if variant in row.keys():
if row[variant] == "":
logger.warn(f"""{variant} is blank that could be tasty""")
pass
else:
logger.debug(f"""swapping {variant} (<redacted>) for {ln.upper()}""")
row[variant] = ln.upper()
return row
def replaceKnownAs(variant, row, newvalue="NULL"):
if variant in row.keys():
if row[variant] == "":
logger.warn(f"""{variant} is blank that could be tasty""")
pass
else:
logger.debug(f"""swapping {variant} (<redacted>) for {newvalue.upper()}""")
row[variant] = newvalue.upper()
return row
def replacePostcode(variant, row, postcode="LS1 1AA"):
if variant in row.keys():
if row[variant] == "":
logger.warn(f"""{variant} is blank that could be tasty""")
pass
else:
logger.debug(f"""swapping {variant} (<redacted>) for {postcode.upper()}""")
row[variant] = postcode.upper()
return row
def replaceGUID(variant, row):
g = str(uuid.uuid4())
if variant in row.keys():
if row[variant] == "":
logger.warn(f"""{variant} is blank that could be tasty""")
pass
else:
logger.debug(f"""swapping {variant} (<redacted>) for {g}""")
row[variant] = g
return row
def replaceDob(row, variant):
"""Process a row of csv looking for variants of date of birth and replacing them with the date defined in newDob.
23Mar1974
"""
if variant in row.keys():
if row[variant] == "":
logger.warn(f"""{variant} is blank that could be tasty""")
pass
else:
year_of_birth = row[variant][-4:]
newDob = f"01Jan{year_of_birth}"
logger.debug(f"""swapping {variant} (<redacted>) for {newDob} {year_of_birth}""")
row[variant] = newDob
return row
def main():
"""Process csv files and replace real ninos with made up ninos.
It is desirable to preserve the structure of the ninos in the
file, replace an already seen nino with it's cached value stored
in *nino_cache*. This happens only across a single run of this
program, each subseqent run will cause a different nino to be
generated.
Surname type columns are cached only per row, so *SMITH* may
be replaced with *JONES* in row 1 but with *KNOWLES* in a
subsequent run.
the lists of columns like:
nino_columns = [ 'NINO', 'NINO1' ]
are the mechanism by which the column names which should be cleansed are chosen.
"""
#logzero.loglevel(logging.DEBUG)
logzero.loglevel(logging.INFO)
config = sdp.ConfigProduction
nino_cache = {}
surname_cache = {}
forename_cache = {}
filename = sys.argv[1]
p, f = os.path.split(filename)
of = '/tmp/test.csv'
logger.info(f'filename: {filename} output_file: {of}')
#csvin = csv.DictReader(open(filename, mode='r', encoding='utf-8-sig'), dialect=csv.excel)
csvin = csv.DictReader(open(filename, mode='r', encoding='cp1252'), dialect=csv.excel)
#csvin = csv.DictReader(open(filename, mode='r'), dialect=csv.excel)
i = csvin.fieldnames
csvout = csv.DictWriter(open(of, 'w'), fieldnames=i)
csvout.writeheader()
logger.debug(f'csvin: {type(csvin)}')
logger.debug(f'headers: {i}')
# Set the names of the columns in the that you wan to be anonymised:
guid_columns = ['CID']
nino_columns = ['NINO', 'NINO1', 'NC_NINO', 'CCNINO', 'ClaimantNINO']
forename_columns = [ 'FORENAME_1', 'NC_FORENAME_1', 'FORENAME_11', 'CCFORENAME', 'FIRSTNAME', 'FirstName' ]
knownAsName_columns = [ 'KnownAsName' ]
surname_columns = ['SURNAME', 'SURNAME1', 'NC_SURNAME', 'CCSURNAME', 'LASTNAME', 'LastName', 'MiddleName' ]
dob_columns = ['DATE_OF_BIRTH', 'NC_DATE_OF_BIRTH', 'DATE_OF_BIRTH1']
postcode_columns = ['POSTCODE', 'RESIDENTIAL_POSTCODE', 'ClaimantPostCode']
for row in csvin:
logger.debug(row)
for variant in nino_columns:
row, nino_cache = replaceNINOCached(variant, row, nino_cache)
for variant in surname_columns:
row, surname_cache = replaceNamesCached(variant, row, surname_cache)
for variant in forename_columns:
row, forename_cache = replaceNamesCached(variant, row, forename_cache)
for variant in knownAsName_columns:
row = replaceKnownAs(variant, row)
# guids
for variant in guid_columns:
row = replaceGUID(variant, row)
# postcode
for variant in postcode_columns:
row = replacePostcode(variant, row)
# DOB
for variant in dob_columns:
row = replaceDob(row, variant)
csvout.writerow(row)
logger.info('')
if __name__ == "__main__":
main()
#!/usr/bin/env python
"""
"""
import exrex
def doMeANino(realnino=False):
""""""
if realnino == True:
r = '^[A-CEGHJ-PR-TW-Z]{1}[A-CEGHJ-NPR-TW-Z]{1}[0-9]{6}[A-D]{1}$'
else:
r = '[Q]{1}[A-CEGHJ-NPR-TW-Z]{1}[0-9]{6}[A-D]{1}$'
n = exrex.getone(r)
return n
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment