Skip to content

Instantly share code, notes, and snippets.

@slav123
Created March 16, 2020 11:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slav123/ee11788753a80012f74f0be6fbd58fa5 to your computer and use it in GitHub Desktop.
Save slav123/ee11788753a80012f74f0be6fbd58fa5 to your computer and use it in GitHub Desktop.
import gnaf
package main
import (
"database/sql"
"encoding/csv"
"flag"
_ "github.com/go-sql-driver/mysql"
"io"
"log"
"os"
"strings"
"sync"
"time"
)
var (
TableName = ""
Filename = ""
Delimiter = '|' // default delimiter for csv files
Max_sql_connections = 25 // default max_connections of mysql is 150,
Dir string
db *sql.DB
)
// parse flags and command line arguments
func parseSysArgs() {
table := flag.String("table", TableName, "Name of MySQL database table.")
delimiter := flag.String("d", string(Delimiter), "Delimiter used in .csv file.")
maxConns := flag.Int("maxConns", Max_sql_connections, "Maximum number of concurrent connections to database. Value depends on your MySQL configuration.")
flag.Parse()
args := flag.Args()
if len(args) != 1 {
log.Fatal("Filename not specified. Only one file permitted. Use -h for help")
}
Filename = args[0]
if *table == "" { // if table name not set, guess tablename (use filename)
if strings.HasSuffix(Filename, ".csv") {
TableName = Filename[:len(Filename)-len(".csv")]
} else {
TableName = Filename
}
} else {
TableName = *table
}
Delimiter = []rune(*delimiter)[0]
Max_sql_connections = *maxConns
}
func init() {
// --------------------------------------------------------------------------
// database connection setup
// --------------------------------------------------------------------------
db, err := sql.Open("mysql", "geocoding:geocoding@ncore.cbrqkgo3xm3v.ap-southeast-2.rds.amazonaws.com/geocoding")
if err != nil {
log.Fatal(err.Error())
return
}
// check database connection
err = db.Ping()
if err != nil {
log.Fatal(err.Error())
return
}
// set max idle connections
db.SetMaxIdleConns(Max_sql_connections)
defer db.Close()
}
func main() {
parseSysArgs()
tables := []string{"ADDRESS_DETAIL", "STREET_LOCALITY", "ADDRESS_DEFAULT_GEOCODE", "LOCALITY", "STATE"}
states := []string{"NSW", "VIC", "QLD"}
Dir = "/Users/slav/Downloads/feb20_gnaf_pipeseparatedvalue/G-NAF/G-NAF FEBRUARY 2020/Standard/"
for _, state := range states {
for _, table := range tables {
Filename = state + "_" + table + "_psv.psv"
single()
}
}
}
func single() {
// --------------------------------------------------------------------------
// prepare buffered file reader
// --------------------------------------------------------------------------
file, err := os.Open(Filename)
if err != nil {
log.Fatal(err.Error())
}
reader := csv.NewReader(file)
reader.Comma = Delimiter // set custom comma for reader (default: ',')
// --------------------------------------------------------------------------
// read rows and insert into database
// --------------------------------------------------------------------------
start := time.Now() // to measure execution time
query := "" // query statement
callback := make(chan int) // callback channel for insert goroutines
connections := 0 // number of concurrent connections
insertions := 0 // counts how many insertions have finished
available := make(chan bool, Max_sql_connections) // buffered channel, holds number of available connections
for i := 0; i < Max_sql_connections; i++ {
available <- true
}
// start status logger
startLogger(&insertions, &connections)
// start connection controller
startConnectionController(&insertions, &connections, callback, available)
var wg sync.WaitGroup
id := 1
isFirstRow := true
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err.Error())
}
if isFirstRow {
parseColumns(record, &query)
isFirstRow = false
} else if <-available { // wait for available database connection
connections += 1
id += 1
wg.Add(1)
go insert(id, query, callback, &connections, &wg, string2interface(record))
}
}
wg.Wait()
elapsed := time.Since(start)
log.Printf("Status: %d insertions\n", insertions)
log.Printf("Execution time: %s\n", elapsed)
}
// inserts data into database
func insert(id int, query string, callback chan<- int, conns *int, wg *sync.WaitGroup, args []interface{}) {
// make a new statement for every insert,
// this is quite inefficient, but since all inserts are running concurrently,
// it's still faster than using a single prepared statement and
// inserting the data sequentielly.
// we have to close the statement after the routine terminates,
// so that the connection to the database is released and can be reused
stmt, err := db.Prepare(query)
if err != nil {
log.Fatal(err.Error())
}
defer stmt.Close()
_, err = stmt.Exec(args...)
if err != nil {
log.Printf("ID: %d (%d conns), %s\n", id, *conns, err.Error())
}
// finished inserting, send id over channel to signalize termination of routine
callback <- id
wg.Done()
}
// controls termination of program and number of connections to database
func startConnectionController(insertions, connections *int, callback <-chan int, available chan<- bool) {
go func() {
for {
<-callback // returns id of terminated routine
*insertions += 1 // a routine terminated, increment counter
*connections -= 1 // and unregister its connection
available <- true // make new connection available
}
}()
}
// print status update to console every second
func startLogger(insertions, connections *int) {
go func() {
c := time.Tick(time.Second)
for {
<-c
log.Printf("Status: %d insertions, %d database connections\n", *insertions, *connections)
}
}()
}
// parse csv columns, create query statement
func parseColumns(columns []string, query *string) {
*query = "INSERT INTO " + TableName + " ("
placeholder := "VALUES ("
for i, c := range columns {
if c == "" {
c = "NULL"
}
if i == 0 {
*query += c
placeholder += "?"
} else {
*query += ", " + c
placeholder += ", ?"
}
}
placeholder += ")"
*query += ") " + placeholder
}
// convert []string to []interface{}
func string2interface(s []string) []interface{} {
i := make([]interface{}, len(s))
for k, v := range s {
i[k] = v
}
return i
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment