Skip to content

@keyurdg /mysql_query_multiplexer.go
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Read output from pt-query-digest and multiplex queries to MySQL over multiple threads.
package main
import (
"bufio"
"bytes"
_ "github.com/Go-SQL-Driver/MySQL"
"database/sql"
"flag"
"fmt"
"io"
"log"
"os"
"strings"
)
var (
// Actual messages
msgs = make(chan string, 1000000)
// Indicate when consumer has finishes
done = make(chan bool)
db_host string
db_user string
db_password string
db_name string
db_charset string
)
func produce() {
r := bufio.NewReaderSize(io.Reader(os.Stdin), 32*1024*1024)
var query bytes.Buffer
for {
line, err := r.ReadString('\n')
if err == nil {
// Based on pt-query-digest's output:
// Start of a new '#' block indicates the previous query has
// ended. This is slightly more robust than looking for a trailing
// semi-colon, in case the query doesn't end in one.
if len(line) > 0 && line[0] == '#' {
s := strings.TrimSpace(query.String())
if len(s) > 1 {
select {
case msgs <- s:
/* nothing */
default:
log.Println("Channel full; dropping message")
}
}
query.Reset()
} else {
query.WriteString(" " + line)
}
continue
}
if err != nil && err != io.EOF {
fmt.Println("Error reading from stdin: " + err.Error())
break
}
if err != nil && err == io.EOF {
break
}
}
fmt.Println("Done with produce")
close(msgs)
}
func consume() {
defer func() {
done <- true
}()
for {
select {
case msg, ok := <-msgs:
if !ok {
return
}
// Creating a new connection for every query. Trying to exercise MySQL's connection
// handling a bit
db, e := sql.Open("mysql", db_user+":"+db_password+"@tcp("+db_host+":3306)/"+db_name+"?charset="+db_charset)
if e != nil {
panic(e)
}
_, err := db.Exec(msg)
if err != nil {
log.Println(msg)
log.Println(err.Error())
}
db.Close()
}
}
}
func main() {
var threads = flag.Int("threads", 8, "Execution threads")
flag.StringVar(&db_host, "db-host", "", "DB Host")
flag.StringVar(&db_user, "db-user", "", "DB Username")
flag.StringVar(&db_password, "db-password", "", "DB Password")
flag.StringVar(&db_name, "db-name", "", "DB Name")
flag.StringVar(&db_charset, "db-charset", "", "DB Character set")
var error_log = flag.String("log", "", "Error log file location and name")
var help = flag.Bool("h", false, "Help")
flag.Parse()
if *help == true {
flag.Usage()
return
}
if db_host == "" || db_user == "" || db_password == "" || db_name == "" || db_charset == "" || *error_log == "" {
flag.Usage()
return
}
logFile, err := os.OpenFile(*error_log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
panic(err.Error() + " opening log file: " + *error_log)
}
log.SetOutput(logFile)
log.SetFlags(log.LstdFlags)
fmt.Println(fmt.Sprintf("Starting go-execution with %d threads", *threads))
go produce()
for i := 0; i < *threads; i++ {
go consume()
}
for i := 0; i < *threads; i++ {
<-done
}
fmt.Println("Done with consume")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.