Skip to content

Instantly share code, notes, and snippets.

@donatj
Forked from brandon-braner/main.go
Created March 19, 2024 21:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save donatj/df08aedb12c426344a38c2493778d1eb to your computer and use it in GitHub Desktop.
Save donatj/df08aedb12c426344a38c2493778d1eb to your computer and use it in GitHub Desktop.
my attempt at processing a large csv
package main
import (
"database/sql"
"encoding/csv"
"fmt"
"io"
"log"
"os"
"sync"
"time"
_ "github.com/lib/pq" // add this
)
var filePath = "/Volumes/rog-4tb/works.csv"
var counter = 1
var data []map[string]string
var headers = []string{"id", "doi", "title", "display_name", "publication_year", "publication_date", "type", "cited_by_count",
"is_retracted", "is_paratext", "cited_by_api_url", "abstract_inverted_index", "language"}
var connStr = "user=postgres dbname=postgres host=34.123.198.84 password=inittowinit20"
func main() {
db, err := sql.Open("postgres", connStr)
if err != nil {
log.Fatal(err)
}
db.SetMaxOpenConns(20) // 20 is just an example
start := time.Now()
ch := make(chan []string, 4)
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
go func() {
wg.Add(1)
consume(db, ch)
wg.Done()
}()
}
publish(ch)
wg.Wait()
fmt.Println("Total time: ", time.Since(start))
}
func publish(ch chan []string) {
csvfile, err := os.Open(filePath)
if err != nil {
log.Fatalln("Couldn't open the csv file", err)
}
r := csv.NewReader(csvfile)
//headers, err := r.Read()
if err != nil {
log.Fatalln("Error reading the csv", err)
}
for {
record, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatalln("Error reading the csv", err)
}
ch <- record
}
close(ch)
}
func consume(db *sql.DB, ch chan []string) {
for {
if counter%1000 == 0 {
fmt.Println("Counter: ", counter)
}
counter++
row, ok := <-ch
if !ok {
break
}
//fmt.Println(row[0])
//if len(row) != len(headers) {
// fmt.Println("Error: ", row)
// continue
//}
if row[11] == "" {
row[11] = "{}"
}
err := db.QueryRow(`INSERT INTO openalex.works(
id,
doi,
title,
display_name,
publication_year,
publication_date,
type,
cited_by_count,
is_retracted,
is_paratext,
cited_by_api_url,
abstract_inverted_index,
language)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12]).Scan(&data)
if err != nil {
//fmt.Println("Error: ", err)
continue
}
//fmt.Println(len(row))
//if len(row) != 13 {
// fmt.Println("Error: ", row)
// continue
//}
//fmt.Println("\n\n")
}
}
//type Works struct {
// id string
// doi string
// title string
// display_name string
// publication_year int
// publication_date string
// typeof string
// cited_by_count int
// is_retracted bool
// is_paratext bool
// cited_by_api_url string
// abstract_inverted_index string
// language string
//}
//
//func writeToWorks(data Works) {
//
//}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment