Skip to content

Instantly share code, notes, and snippets.

@z81
Created December 31, 2018 02:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save z81/2e064c99db25a192694a5b0719a78bb8 to your computer and use it in GitHub Desktop.
Save z81/2e064c99db25a192694a5b0719a78bb8 to your computer and use it in GitHub Desktop.
package main
import (
"database/sql"
"fmt"
"log"
"math"
"runtime"
"sync"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/kshvakov/clickhouse"
)
type Stats struct {
success int
recived int
startTime int64
time int64
}
func write(connect *sql.DB, batchSize int, stats *Stats) {
stats.recived += batchSize
var (
tx, _ = connect.Begin()
stmt, _ = tx.Prepare("INSERT INTO data2 (name, user, ts, v1, v2) VALUES (?, ?, ?, ?, ?)")
)
graphCount := 3
userCount := 1000
for i := 0; i < (batchSize / graphCount / userCount); i++ {
for g := 0; g < graphCount; g++ {
for u := 0; u < userCount; u++ {
val1 := float64(u % 100)
val2 := float64(u % 25)
if _, err := stmt.Exec(
u,
g,
stats.time,
val1,
val2,
// clickhouse.Array([]float64{val1, val2}),
); err != nil {
log.Fatal(err)
}
}
stats.time = (stats.time + 60)
}
}
if err := tx.Commit(); err != nil {
log.Fatal(err)
return
}
stats.success += batchSize
}
func t(stats *Stats) {
timeDiff := float64(time.Now().Unix() - stats.startTime)
recAvgMsgSec := int(math.Floor(float64(stats.recived) / timeDiff))
sucAvgMsgSec := int(math.Floor(float64(stats.success) / timeDiff))
fmt.Printf("Suc %[1]s, Rec %[2]s, Avg rec %[3]s/sec, avg suc %[4]s/sec \n",
humanize.Comma(int64(stats.success)),
humanize.Comma(int64(stats.recived)),
humanize.Comma(int64(recAvgMsgSec)),
humanize.Comma(int64(sucAvgMsgSec)),
)
}
func main() {
runtime.GOMAXPROCS(4)
batchSize := 350000
msgSec := 1800000
workers := 4
stats := Stats{
success: 0,
recived: 0,
startTime: time.Now().Unix(),
time: 118771240,
}
timer1 := time.NewTicker(time.Second)
go func() {
for range timer1.C {
t(&stats)
}
}()
connect, err := sql.Open("clickhouse", "tcp://localhost:9000?debug=false")
if err != nil {
log.Fatal(err)
}
if err := connect.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
fmt.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
fmt.Println(err)
}
return
}
var wg sync.WaitGroup
wg.Add(1)
oneTick := float64(time.Second) / float64(msgSec) * float64(batchSize)
for w := 1; w <= workers; w++ {
timer2 := time.NewTicker(time.Duration(oneTick / float64(workers)))
go func() {
for range timer2.C {
write(connect, batchSize, &stats)
}
}()
}
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment