Skip to content

Instantly share code, notes, and snippets.

@marciol
Last active September 27, 2015 17:28
Show Gist options
  • Save marciol/1306488 to your computer and use it in GitHub Desktop.
Save marciol/1306488 to your computer and use it in GitHub Desktop.
Hard Coded Program to export the Output from Mysql TSV to MongoDB
package main
import (
"bufio"
"flag"
"os"
"strings"
"launchpad.net/gobson/bson"
"launchpad.net/mgo"
"bytes"
)
var nprocs *int = flag.Int("nprocs", 4, "number of proccess")
var nlines *int = flag.Int("n", 100000, "number of lines on buffer")
var mongo_addr *string = flag.String("a", "", "mongodb address")
var database *string = flag.String("d", "", "mongodb database")
var collection *string = flag.String("c", "", "mongodb collection")
func main() {
flag.Parse()
session, m_err := mgo.Mongo(*mongo_addr)
if m_err != nil { panic(m_err) }
in := startReader()
bson_out := make(chan []*bson.M)
for buffer := range in {
go changeToBson(buffer, bson_out)
toResults <- bson_out
go sendResults(toResults, session)
}
}
func startReader() chan *bytes.Buffer {
str_buffer := make(chan *bytes.Buffer)
go func() {
filename := flag.Arg(0)
file, f_err := os.Open(filename)
if f_err != nil { panic(f_err) }
file_reader := bufio.NewReader(file)
for {
buffer := bytes.NewBufferString("")
for i := 0; i < *nlines; i++ {
line, _, rd_err := file_reader.ReadLine()
if rd_err != nil {
close(str_buffer)
return
}
buffer.Write(line)
buffer.WriteByte('\n')
}
str_buffer <- buffer
}
}()
return str_buffer
}
func changeToBson(buffer *bytes.Buffer, bson_out chan<- []*bson.M) {
bsn_ch := make(chan *bson.M)
bsn_slc := make([]*bson.M, *nlines)
returning := make(chan bool)
for {
line, rd_err := buffer.ReadString('\n')
if rd_err != nil { break }
go processLine(line, bsn_ch, returning)
}
for i := 0; i < *nlines ; i++ {
select {
case v := <-bsn_ch:
bsn_slc[i] = v
case <-returning:
}
}
bson_out <- bsn_slc
//str := string(buffer.Bytes())
}
func processLine(line string, b chan<- *bson.M, returning chan<- bool) {
f := strings.Fields(line)
if len(f) < 5 {
returning <- true
return
}
b <- &bson.M{"site_url": f[0], "feed_id": f[1], "feed_item_id": f[2], "created_at": f[3], "updated_at": f[4]}
}
func sendResults(bsn_slc []*bson.M, session *mgo.Session) {
//fmt.Println(bsn_slc)
c := session.DB(*database).C(*collection)
c.Insert(bsn_slc)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment