Skip to content

Instantly share code, notes, and snippets.

@brandon-braner
Created March 19, 2024 21:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save brandon-braner/579f48b5992f545a6c8799e8061f170d to your computer and use it in GitHub Desktop.
Save brandon-braner/579f48b5992f545a6c8799e8061f170d to your computer and use it in GitHub Desktop.
my attempt at processing a large csv
package main
import (
"database/sql"
"encoding/csv"
"fmt"
_ "github.com/lib/pq" // add this
"io"
"log"
"os"
"time"
)
var filePath = "/Volumes/rog-4tb/works.csv"
var counter = 1
var data []map[string]string
var headers = []string{"id", "doi", "title", "display_name", "publication_year", "publication_date", "type", "cited_by_count",
"is_retracted", "is_paratext", "cited_by_api_url", "abstract_inverted_index", "language"}
var connStr = "user=postgres dbname=postgres host=34.123.198.84 password=inittowinit20"
func main() {
start := time.Now()
ch := make(chan []string)
go consume(&ch)
go consume(&ch)
go consume(&ch)
go consume(&ch)
go consume(&ch)
go consume(&ch)
go consume(&ch)
go consume(&ch)
go consume(&ch)
go consume(&ch)
publish(&ch)
fmt.Println("Total time: ", time.Since(start))
}
func publish(ch *chan []string) {
csvfile, err := os.Open(filePath)
if err != nil {
log.Fatalln("Couldn't open the csv file", err)
}
r := csv.NewReader(csvfile)
//headers, err := r.Read()
if err != nil {
log.Fatalln("Error reading the csv", err)
}
for {
record, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatalln("Error reading the csv", err)
}
*ch <- record
}
}
func consume(ch *chan []string) {
db, err := sql.Open("postgres", connStr)
if err != nil {
log.Fatal(err)
}
for {
if counter%1000 == 0 {
fmt.Println("Counter: ", counter)
}
counter++
row := <-*ch
//fmt.Println(row[0])
//if len(row) != len(headers) {
// fmt.Println("Error: ", row)
// continue
//}
if row[11] == "" {
row[11] = "{}"
}
err := db.QueryRow(`INSERT INTO openalex.works(
id,
doi,
title,
display_name,
publication_year,
publication_date,
type,
cited_by_count,
is_retracted,
is_paratext,
cited_by_api_url,
abstract_inverted_index,
language)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12]).Scan(&data)
if err != nil {
//fmt.Println("Error: ", err)
continue
}
//fmt.Println(len(row))
//if len(row) != 13 {
// fmt.Println("Error: ", row)
// continue
//}
//fmt.Println("\n\n")
}
}
//type Works struct {
// id string
// doi string
// title string
// display_name string
// publication_year int
// publication_date string
// typeof string
// cited_by_count int
// is_retracted bool
// is_paratext bool
// cited_by_api_url string
// abstract_inverted_index string
// language string
//}
//
//func writeToWorks(data Works) {
//
//}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment