-
-
Save donatj/df08aedb12c426344a38c2493778d1eb to your computer and use it in GitHub Desktop.
my attempt at processing a large csv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"database/sql" | |
"encoding/csv" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"sync" | |
"time" | |
_ "github.com/lib/pq" // add this | |
) | |
var filePath = "/Volumes/rog-4tb/works.csv" | |
var counter = 1 | |
var data []map[string]string | |
var headers = []string{"id", "doi", "title", "display_name", "publication_year", "publication_date", "type", "cited_by_count", | |
"is_retracted", "is_paratext", "cited_by_api_url", "abstract_inverted_index", "language"} | |
var connStr = "user=postgres dbname=postgres host=34.123.198.84 password=inittowinit20" | |
func main() { | |
db, err := sql.Open("postgres", connStr) | |
if err != nil { | |
log.Fatal(err) | |
} | |
db.SetMaxOpenConns(20) // 20 is just an example | |
start := time.Now() | |
ch := make(chan []string, 4) | |
wg := sync.WaitGroup{} | |
for i := 0; i < 10; i++ { | |
go func() { | |
wg.Add(1) | |
consume(db, ch) | |
wg.Done() | |
}() | |
} | |
publish(ch) | |
wg.Wait() | |
fmt.Println("Total time: ", time.Since(start)) | |
} | |
func publish(ch chan []string) { | |
csvfile, err := os.Open(filePath) | |
if err != nil { | |
log.Fatalln("Couldn't open the csv file", err) | |
} | |
r := csv.NewReader(csvfile) | |
//headers, err := r.Read() | |
if err != nil { | |
log.Fatalln("Error reading the csv", err) | |
} | |
for { | |
record, err := r.Read() | |
if err == io.EOF { | |
break | |
} | |
if err != nil { | |
log.Fatalln("Error reading the csv", err) | |
} | |
ch <- record | |
} | |
close(ch) | |
} | |
func consume(db *sql.DB, ch chan []string) { | |
for { | |
if counter%1000 == 0 { | |
fmt.Println("Counter: ", counter) | |
} | |
counter++ | |
row, ok := <-ch | |
if !ok { | |
break | |
} | |
//fmt.Println(row[0]) | |
//if len(row) != len(headers) { | |
// fmt.Println("Error: ", row) | |
// continue | |
//} | |
if row[11] == "" { | |
row[11] = "{}" | |
} | |
err := db.QueryRow(`INSERT INTO openalex.works( | |
id, | |
doi, | |
title, | |
display_name, | |
publication_year, | |
publication_date, | |
type, | |
cited_by_count, | |
is_retracted, | |
is_paratext, | |
cited_by_api_url, | |
abstract_inverted_index, | |
language) | |
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12]).Scan(&data) | |
if err != nil { | |
//fmt.Println("Error: ", err) | |
continue | |
} | |
//fmt.Println(len(row)) | |
//if len(row) != 13 { | |
// fmt.Println("Error: ", row) | |
// continue | |
//} | |
//fmt.Println("\n\n") | |
} | |
} | |
//type Works struct { | |
// id string | |
// doi string | |
// title string | |
// display_name string | |
// publication_year int | |
// publication_date string | |
// typeof string | |
// cited_by_count int | |
// is_retracted bool | |
// is_paratext bool | |
// cited_by_api_url string | |
// abstract_inverted_index string | |
// language string | |
//} | |
// | |
//func writeToWorks(data Works) { | |
// | |
//} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment