Skip to content

Instantly share code, notes, and snippets.

@ewhitebloom
Last active November 9, 2015 00:50
Show Gist options
  • Save ewhitebloom/a3b812e0847d9e9b310e to your computer and use it in GitHub Desktop.
Save ewhitebloom/a3b812e0847d9e9b310e to your computer and use it in GitHub Desktop.
Imports data from CMS CSV flat files. Uses the beauty of the SmartyStreets API!
package main
import (
"bytes"
"database/sql"
"encoding/csv"
"encoding/json"
"fmt"
_ "github.com/go-sql-driver/mysql"
"io/ioutil"
"net/http"
"os"
"strings"
"sync"
"time"
)
var (
MYSQLAUTH = os.Getenv("GO_MYSQLAUTH")
LTACHFilePath = os.Getenv("LTACH_FILE")
IRFFilePath = os.Getenv("IRF_FILE")
HospiceFilePath = os.Getenv("HOSPICE_FILE")
badDataFilePath = os.Getenv("BAD_PROVIDER_FILE")
SMARTYSTREETS_TOKEN = os.Getenv("SMARTYSTREETS_TOKEN")
SMARTYSTREETS_ID = os.Getenv("SMARTYSTREETS_ID")
fileToProviderType = map[string]string{
LTACHFilePath: "LongTermCareHospital",
IRFFilePath: "AcuteRehabFacility",
HospiceFilePath: "Hospice",
}
wg sync.WaitGroup
)
type Provider struct {
providerType sql.NullString
name sql.NullString
street sql.NullString
city sql.NullString
state sql.NullString
zip sql.NullString
plusFour sql.NullString
latitude sql.NullFloat64
longitude sql.NullFloat64
phoneNumber sql.NullString
medicareProviderNumber sql.NullString
}
type SmartyStreetsRequest struct {
Street string `json:"street"`
City string `json:"city"`
State string `json:"state"`
Zipcode string `json:"zipcode"`
Candidates int `json:"candidates"`
}
type SmartyStreetsResponse struct {
Street string `json:"delivery_line_1"`
Components SmartyStreetsComponents
Metadata SmartyStreetsMetaData
}
type SmartyStreetsComponents struct {
CityName string `json:"city_name"`
State string `json:"state_abbreviation"`
Zipcode string `json:"zipcode"`
Plus4 string `json:"plus4_code"`
}
type SmartyStreetsMetaData struct {
Latitude float64
Longitude float64
}
func (p Provider) String() string {
return fmt.Sprintf("{ type: %v, name: %v, street: %v, city: %v, state: %v, zip: %v, latitude: %v, longitude: %v, phone: %v, medicareNumber: %v }", p.providerType.String, p.name.String, p.street.String, p.city.String, p.state.String, p.zip.String, p.latitude.Float64, p.longitude.Float64, p.phoneNumber.String, p.medicareProviderNumber.String)
}
func (p *Provider) ToCSVRow() string {
return fmt.Sprintf("%v, %v, %v, %v, %v, %v, %v, %v, %v, %v", p.providerType.String, p.name.String, p.street.String, p.city.String, p.state.String, p.zip.String, p.latitude.Float64, p.longitude.Float64, p.phoneNumber.String, p.medicareProviderNumber.String)
}
func (p *Provider) SanitizeAndGeocodeProviderAddress() bool {
queryString := "https://api.smartystreets.com/street-address?auth-id=" + SMARTYSTREETS_ID + "&auth-token=" + SMARTYSTREETS_TOKEN
smartyRequest := SmartyStreetsRequest{p.street.String, p.city.String, p.state.String, p.zip.String, 1}
requestJSON, err := json.Marshal(smartyRequest)
if err != nil {
panic(err.Error())
}
requestJSON = []byte("[" + string(requestJSON) + "]")
request, err := http.NewRequest("POST", queryString, bytes.NewReader(requestJSON))
if err != nil {
panic(err.Error())
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("X-Include-Invalid", "true")
client := http.Client{}
resp, err := client.Do(request)
if err != nil {
panic(err.Error())
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err.Error())
}
data := SmartyStreetsResponse{}
stringBody := string(body)
stringBody = strings.Replace(stringBody, "[", "", -1)
stringBody = strings.Replace(stringBody, "]", "", -1)
err2 := json.NewDecoder(strings.NewReader(stringBody)).Decode(&data)
if err2 != nil {
panic(err.Error())
}
if data.Metadata.Latitude == 0 || data.Metadata.Longitude == 0 || data.Street == "" {
return false
}
if len(data.Components.Plus4) == 4 {
p.plusFour.Scan(data.Components.Plus4)
}
p.street.Scan(data.Street)
p.city.Scan(data.Components.CityName)
p.state.Scan(data.Components.State)
p.zip.Scan(data.Components.Zipcode)
p.latitude.Scan(data.Metadata.Latitude)
p.longitude.Scan(data.Metadata.Longitude)
return true
}
func (p *Provider) QualityDataProvider() bool {
if !p.name.Valid || !p.street.Valid || !p.city.Valid || !p.state.Valid || !p.medicareProviderNumber.Valid {
return false
}
return true
}
func PersistRowCareProvider(provider Provider, transaction *sql.Tx) (bool, int64) {
result, err := transaction.Exec("INSERT INTO care_providers (type, name, phone_number, medicare_provider_number, onboarding_stage, updated_at, created_at) VALUES (?,?,?,?,'red',NOW(),NOW())", provider.providerType, provider.name, provider.phoneNumber, provider.medicareProviderNumber)
if err != nil {
fmt.Println(err.Error())
return false, 0
} else if result != nil {
providerId, err := result.LastInsertId()
if err != nil || providerId == 0 {
fmt.Println(err.Error())
return false, 0
} else {
return true, providerId
}
}
return false, 0
}
func PersistRowAddress(provider Provider, transaction *sql.Tx, providerId int64) bool {
_, err := transaction.Exec("INSERT INTO addresses (addressed_id, addressed_type, street, city, state, zip_code, plus_four, latitude, longitude, created_at, updated_at) VALUES (?,'CareProvider',?,?,?,?,?,?,?,NOW(),NOW())", providerId, provider.street, provider.city, provider.state, provider.zip, provider.plusFour, provider.latitude, provider.longitude)
if err != nil {
fmt.Println(err.Error())
return false
}
return true
}
func QueryStringBuilder(s string) string {
return strings.Join(strings.Split(s, " "), "%20")
}
// Builds a Provider struct, assigning a field only if it's present in the CSV row, otherwise defaults to nil.
// Avoids abstracting iteration through struct fields for performance benefit.
// These attributes are used to feed the SmartyStreetsAPI,
// which then changes them where appropriate, to better formatting.
func BuildProvider(row *[]string, providerType string) Provider {
provider := Provider{}
provider.providerType.Scan(providerType)
if len((*row)[17]) == 5 {
provider.medicareProviderNumber.Scan("0" + (*row)[17])
} else {
provider.medicareProviderNumber.Scan((*row)[17])
}
if len((*row)[11]) > 0 {
provider.name.Scan((*row)[11])
}
if len((*row)[23]) > 0 {
provider.street.Scan((*row)[23])
}
if len((*row)[4]) > 0 {
provider.city.Scan((*row)[4])
}
if len((*row)[20]) > 0 {
provider.state.Scan((*row)[20])
}
if len((*row)[29]) >= 5 {
provider.zip.Scan((*row)[29])
}
if len((*row)[24]) == 10 {
provider.phoneNumber.Scan((*row)[24][:3] + "-" + (*row)[24][3:6] + "-" + (*row)[24][6:])
}
return provider
}
func AnyDuplicates(provider Provider, transaction *sql.Tx) (bool, error) {
rows, err := transaction.Query("SELECT * FROM care_providers WHERE medicare_provider_number=?", provider.medicareProviderNumber)
defer rows.Close()
if err == nil && !rows.Next() {
return false, err
} else if err != nil {
return false, err
} else {
return true, err
}
}
func ProcessFile(levelOfCareFilePath string, db *sql.DB, badDataFile *os.File) {
file, err := os.Open(levelOfCareFilePath)
if err != nil {
panic(err.Error())
}
defer file.Close()
if err != nil {
panic(err.Error())
}
reader := csv.NewReader(file)
records, err := reader.ReadAll()
for _, record := range records[1:] {
wg.Add(1)
go func(record []string, levelOfCareFilePath string) {
defer wg.Done()
provider := BuildProvider(&record, fileToProviderType[levelOfCareFilePath])
if !provider.QualityDataProvider() {
badDataFile.WriteString(provider.ToCSVRow() + "\n")
return
}
for {
transaction, err := db.Begin()
if err != nil {
panic(err.Error())
}
any, err := AnyDuplicates(provider, transaction)
if err != nil {
panic(err.Error())
} else if any {
transaction.Rollback()
break
}
successfulGeocode := provider.SanitizeAndGeocodeProviderAddress()
if !successfulGeocode {
badDataFile.WriteString(provider.ToCSVRow() + "\n")
transaction.Rollback()
return
}
success, id := PersistRowCareProvider(provider, transaction)
if success {
success2 := PersistRowAddress(provider, transaction, id)
if success2 {
err := transaction.Commit()
if err == nil {
break
} else {
panic(err.Error())
}
} else {
transaction.Rollback()
}
} else {
transaction.Rollback()
}
}
}(record, levelOfCareFilePath)
}
}
func main() {
db, err := sql.Open("mysql", MYSQLAUTH)
if err != nil {
panic(err.Error())
}
defer db.Close()
db.SetMaxOpenConns(5)
err2 := db.Ping()
if err2 != nil {
panic(err2.Error())
}
// Dumps bad quality data rows into a file.
badDataFile, err3 := os.Create(badDataFilePath)
if err3 != nil {
panic(err.Error())
}
badDataFile.WriteString("providerType, name, street, city, state, zip_code, latitude, longitude, phone, medicare_provider_number\n")
start := time.Now()
for _, levelOfCareFilePath := range [3]string{LTACHFilePath, IRFFilePath, HospiceFilePath} {
ProcessFile(levelOfCareFilePath, db, badDataFile)
}
wg.Wait()
fmt.Println(time.Since(start))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment