Skip to content

Instantly share code, notes, and snippets.

@ewhitebloom
Last active September 13, 2017 16:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ewhitebloom/7c90b74d76aae5434466 to your computer and use it in GitHub Desktop.
Save ewhitebloom/7c90b74d76aae5434466 to your computer and use it in GitHub Desktop.
ETL NPI data from NPI CSV file to MySQL.
package main
import (
"bufio"
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"
)
const rowCount float64 = 4636902
var (
taxonomyCodes = map[string]string{
"282E00000X": "LongTermCareHospital",
"283X00000X": "AcuteRehabFacility",
"251G00000X": "Hospice",
"314000000X": "SkilledNursingFacility",
"251E00000X": "HomeHealthAgency",
}
medicareColumnIndices = GenerateMedicareColumnIndices()
NPIFilePath = os.Getenv("NPI_FILE")
MYSQLAUTH = os.Getenv("GO_MYSQLAUTH")
wg sync.WaitGroup
rowCounter float64 = 0
)
type Provider struct {
Npi *int64
ProviderType *string
Name *string
Street *string
Street2 *string
City *string
State *string
Zip *string
Phone *string
MedicareProviderNumber *string
}
type CareProviderRowCache struct {
databaseId *int64
databaseNPI *int64
databaseType *string
}
// For new facilities, not already in the database, insert new rows into the care_providers table (one for each taxonomy of the provider),
// including their address and contact information.
func (p *Provider) StoreProviderCSVRowToDB(db *sql.DB, rowMedicareNumbers []string, rowTaxonomyProviderTypes []string) bool {
for _, rowTaxonomyN := range rowTaxonomyProviderTypes {
transaction, err := db.Begin()
if err != nil {
fmt.Println(err.Error())
return false
}
if transaction != nil {
if len(rowMedicareNumbers) > 0 {
p.MedicareProviderNumber = &rowMedicareNumbers[0]
}
p.ProviderType = &rowTaxonomyN
any, err := p.AddressAndTypeDuplicates(transaction)
if err != nil {
fmt.Println(err.Error())
return false
}
if any {
transaction.Rollback()
continue
}
result, err := transaction.Exec("INSERT INTO care_providers (npi, type, name, phone_number, medicare_provider_number, onboarding_stage, updated_at, created_at) VALUES (?,?,?,?,?,'red',NOW(),NOW())", *p.Npi, *p.ProviderType, *p.Name, NilorValueString(p.Phone), NilorValueString(p.MedicareProviderNumber))
if err != nil {
transaction.Rollback()
fmt.Println(err.Error())
return false
} else if result != nil {
providerId, err := result.LastInsertId()
if err != nil || providerId == 0 {
transaction.Rollback()
fmt.Println(err.Error())
return false
}
success := p.StoreProviderAddress(providerId, transaction)
if !success {
return false
}
err2 := transaction.Commit()
if err2 != nil {
fmt.Println(err2.Error())
return false
}
} else {
transaction.Rollback()
return false
}
}
}
return true
}
//Extra duplication checking based on if there's an existing provider with same address and provider type.
func (p *Provider) AddressAndTypeDuplicates(transaction *sql.Tx) (bool, error) {
var rows *sql.Rows
var err error
if p.Street2 != nil {
rows, err = transaction.Query("SELECT addressed_id FROM addresses WHERE street=? AND street2=? AND city=? AND state=? AND zip_code=?", *p.Street, *p.Street2, *p.City, *p.State, *p.Zip)
} else {
rows, err = transaction.Query("SELECT addressed_id FROM addresses WHERE street=? AND street2 IS NULL AND city=? AND state=? AND zip_code=?", *p.Street, *p.City, *p.State, *p.Zip)
}
if err != nil {
transaction.Rollback()
fmt.Println(err.Error())
return false, err
}
careProviderIds := []sql.NullInt64{}
for rows.Next() {
var addressed_id sql.NullInt64
rows.Scan(&addressed_id)
careProviderIds = append(careProviderIds, addressed_id)
}
for _, careProviderId := range careProviderIds {
rows2, err2 := transaction.Query("SELECT * FROM care_providers WHERE id=? AND type=?", careProviderId, *p.ProviderType)
if err2 != nil {
rows2.Close()
transaction.Rollback()
fmt.Println(err2.Error())
return false, err2
}
if rows2.Next() {
rows2.Close()
transaction.Rollback()
return true, err2
}
}
return false, err
}
// For Skilled Nursing and HomeHealth Agencies already in the database,
// if found by their medicare provider number and without an NPI number
// set their NPI number from the CSV row using algorithm
// if one doesn't already have that NPI number.
// If no providers are found by the medicare numbers, make a new provider.
func (p *Provider) UpdateOrCreateProvider(db *sql.DB, csvRow *[]string, rowMedicareNumbers []string, rowTaxonomyProviderTypes []string) bool {
transaction, err := db.Begin()
if transaction != nil {
if err != nil {
transaction.Rollback()
return false
}
rowNPI, _ := strconv.Atoi((*csvRow)[0])
rowsCache := []CareProviderRowCache{}
// Find existing provider matches by medicare numbers or row npi.
databaseRows, err := transaction.Query("SELECT id, npi, type FROM care_providers WHERE medicare_provider_number IN (?) OR npi=?", strings.Join(rowMedicareNumbers, ","), rowNPI)
if err != nil {
transaction.Rollback()
return false
}
defer databaseRows.Close()
//Look for existing providers that already match CSV row NPI number.
for databaseRows.Next() {
var databaseId *int64
var databaseNPI *int64
var databaseType *string
err := databaseRows.Scan(&databaseId, &databaseNPI, &databaseType)
if err != nil || databaseId == nil {
transaction.Rollback()
return false
}
if int64(rowNPI) == *databaseNPI {
databaseRows.Close()
transaction.Rollback()
return true
}
rowsCache = append(rowsCache, CareProviderRowCache{databaseId, databaseNPI, databaseType})
}
// If no providers match CSV row NPI number,
// assign that NPI number to a provider that does match row's provider types and medicare numbers.
for _, rowTaxonomyType := range rowTaxonomyProviderTypes {
for _, rowCached := range rowsCache {
if rowCached.databaseNPI == nil && rowTaxonomyType == *rowCached.databaseType {
_, err := transaction.Exec("UPDATE care_providers SET npi=?, updated_at=NOW() WHERE id=?", rowNPI, int(*rowCached.databaseId))
if err != nil {
transaction.Rollback()
return false
}
transaction.Commit()
return true
}
}
}
transaction.Rollback()
//Otherwise, if no other criteria worked, make a new provider.
for {
if p.StoreProviderCSVRowToDB(db, rowMedicareNumbers, rowTaxonomyProviderTypes) {
return true
}
}
} else {
return false
}
}
func (p *Provider) StoreProviderAddress(providerId int64, transaction *sql.Tx) bool {
_, err := transaction.Exec("INSERT INTO addresses (addressed_id, addressed_type, street, street2, city, state, zip_code, created_at, updated_at) VALUES (?,'CareProvider',?,?,?,?,?,NOW(),NOW())", providerId, *p.Street, NilorValueString(p.Street2), *p.City, *p.State, *p.Zip)
if err != nil {
fmt.Println(err.Error())
transaction.Rollback()
return false
}
return true
}
func (p *Provider) QualityDataProvider() bool {
if p.Name == nil || p.Npi == nil || p.Zip == nil {
return false
}
return true
}
func (p *Provider) IsSkilledNursingOrHomeHealth(rowTaxonomyProviderTypes []string) bool {
for _, SnfHhaCode := range [2]string{"SkilledNursingFacility", "HomeHealthAgency"} {
for _, taxCode := range rowTaxonomyProviderTypes {
if taxCode == SnfHhaCode {
return true
}
}
}
return false
}
func NilorValueString(fieldPointer *string) interface{} {
if fieldPointer == nil {
return nil
} else {
return *fieldPointer
}
}
// Filters CSV rows for the right taxonomy codes (facility type) that we need,
// placing the primary taxonomy code first, if specified.
// Maps code to actual provider type name.
func FindAndMapTaxonomyCodes(row *[]string) ([]string, bool) {
codes := []string{}
for _, v := range [15]int{47, 51, 55, 59, 63, 67, 71, 75, 79, 83, 87, 91, 95, 99, 103} {
taxCode, present := taxonomyCodes[(*row)[v]]
if present {
if (*row)[v+3] == "Y" {
codes = append([]string{taxCode}, codes...)
} else {
codes = append(codes, taxCode)
}
}
}
return codes, len(codes) > 0
}
// Build a provider struct, using runtime reflection, provided by Go's standard 'reflect' package.
func BuildProvider(row *[]string) Provider {
provider := Provider{}
providerValue := reflect.ValueOf(&provider).Elem()
providerFields := map[string]int{
"Npi": 0,
"Name": 4,
"Street": 20,
"Street2": 21,
"City": 22,
"State": 23,
"Zip": 24,
"Phone": 26,
}
for fieldName, fieldRowIndex := range providerFields {
field := providerValue.FieldByName(fieldName)
if fieldName != "Npi" {
if len((*row)[fieldRowIndex]) > 1 {
if fieldName == "Phone" {
phoneNumber := (*row)[26][:3] + "-" + (*row)[26][3:6] + "-" + (*row)[26][6:]
field.Set(reflect.ValueOf(&phoneNumber))
} else {
fieldValue := (*row)[fieldRowIndex]
field.Set(reflect.ValueOf(&fieldValue))
}
}
} else {
npi, _ := strconv.ParseInt((*row)[fieldRowIndex], 10, 0)
if npi != 0 {
field.Set(reflect.ValueOf(&npi))
}
}
}
return provider
}
// With the CSV indices generated from GenerateMedicareColumnIndices(),
// this creates a slice of all potential medicare numbers (length six elements in the CSV row).
func FindMedicareNumbers(row *[]string) []string {
var medicareNumbers []string
for _, v := range medicareColumnIndices {
if len((*row)[v]) == 6 {
medicareNumbers = append(medicareNumbers, (*row)[v])
}
}
return medicareNumbers
}
// Method to generate the column indices where medicare provider numbers could potentially be in the CSV row.
func GenerateMedicareColumnIndices() [50]int {
columnPointer := 107
indexContainer := [50]int{}
for i := 0; i < 50; i++ {
indexContainer[i] = columnPointer
columnPointer += 4
}
return indexContainer
}
//Builds a CSV row fields slice, stripping out unneccessary quotes.
func BuildCSVFields(csvRow string) []string {
tokenizedLine := strings.Split(csvRow, "\",\"")
for _, v := range [2]int{0, len(tokenizedLine) - 1} {
tokenizedLine[v] = strings.Replace(tokenizedLine[v], "\"", "", -1)
}
return tokenizedLine
}
func ActiveFacility(csvRow []string) bool {
deactivation := strings.Trim(csvRow[39], " ")
reactivation := strings.Trim(csvRow[40], " ")
if deactivation == "" || (deactivation != "" && reactivation != "") {
return true
}
return false
}
// Container method for all filtering of relevant data, and database queries.
func StoreCSVRowToDB(row *[]string, db *sql.DB, rowTaxonomyProviderTypes []string, rowMedicareNumbers []string) {
defer wg.Done()
provider := BuildProvider(row)
if !provider.QualityDataProvider() {
return
}
if len(rowMedicareNumbers) > 0 && provider.IsSkilledNursingOrHomeHealth(rowTaxonomyProviderTypes) {
for {
if provider.UpdateOrCreateProvider(db, row, rowMedicareNumbers, rowTaxonomyProviderTypes) {
break
}
}
} else {
for {
if provider.StoreProviderCSVRowToDB(db, rowMedicareNumbers, rowTaxonomyProviderTypes) {
break
}
}
}
}
func ShowProgress() {
rowCounter++
if int(rowCounter)%200000 == 0 {
fmt.Printf("%.2f%%\n", (rowCounter/rowCount)*float64(100))
}
}
func main() {
db, err := sql.Open("mysql", MYSQLAUTH)
if err != nil {
panic(err.Error())
}
defer db.Close()
db.SetMaxOpenConns(30)
err2 := db.Ping()
if err2 != nil {
panic(err2.Error())
}
file, err := os.Open(NPIFilePath)
defer file.Close()
scanner := bufio.NewScanner(file)
//Skip CSV header line.
success := scanner.Scan()
if !success {
return
}
rowCounter++
start := time.Now()
for {
if scanner.Scan() {
tokenizedLine := BuildCSVFields(scanner.Text())
rowTaxonomyProviderTypes, any := FindAndMapTaxonomyCodes(&tokenizedLine)
if any && ActiveFacility(tokenizedLine) {
wg.Add(1)
go StoreCSVRowToDB(&tokenizedLine, db, rowTaxonomyProviderTypes, FindMedicareNumbers(&tokenizedLine))
}
ShowProgress()
} else {
break
}
}
wg.Wait()
fmt.Println(time.Since(start))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment