Skip to content

Instantly share code, notes, and snippets.

@jiapengjp
Created December 17, 2015 02:25
Show Gist options
  • Save jiapengjp/faaa3aa4e7f6d08b13d1 to your computer and use it in GitHub Desktop.
Save jiapengjp/faaa3aa4e7f6d08b13d1 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"bytes"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
)
type Counter struct {
LinePerBuf int
BytePerBuf int
ErrCount int64
PostStartCount int64
PostEndCount int64
prev [3]int64
}
func (c *Counter) String() string {
return fmt.Sprintf("post:%v, posted:%v, line:%v, byte:%v, err:%v",
c.PostStartCount, c.PostEndCount, c.PostEndCount*int64(c.LinePerBuf),
c.PostEndCount*int64(c.BytePerBuf), c.ErrCount)
}
func (c *Counter) CsvTitle() string {
return "time,post,posted,line,byte,err,bulksize,bulkline\n"
}
func (c *Counter) DiffCsv() string {
diff, _ := c.getDiff()
return fmt.Sprintf("%v,%v,%v,%v,%v,%v,%v,%v\n",
time.Now().Unix(), diff[0], diff[1],
diff[1]*int64(c.LinePerBuf), diff[1]*int64(c.BytePerBuf),
diff[2], c.BytePerBuf, c.LinePerBuf)
}
func (c *Counter) getDiff() (diff [4]int64, changed bool) {
var current [3]int64
current[0] = c.PostStartCount
current[1] = c.PostEndCount
current[2] = c.ErrCount
for i, _ := range c.prev {
diff[i] = current[i] - c.prev[i]
c.prev[i] = current[i]
if diff[i] != 0 {
changed = true
}
}
return
}
func printError(exeName string, err error) {
log.Println(err)
log.Printf("Usage: %v input-file bulk-size test-duration url concurrency\n", exeName)
}
func main() {
if len(os.Args) != 6 {
printError(os.Args[0], fmt.Errorf("Please input 6 arguments. %d received.", len(os.Args)))
return
}
lines, err := ioutil.ReadFile(os.Args[1])
if err != nil {
printError(os.Args[0], err)
return
}
var counter Counter
buf, err := formatBulk(lines, os.Args[2], &counter)
if err != nil {
printError(os.Args[0], err)
return
}
ioutil.WriteFile("buffer.dump", buf, 0644)
duration, err := time.ParseDuration(os.Args[3])
if err != nil {
printError(os.Args[0], err)
return
}
concurrency, err := strconv.Atoi(os.Args[5])
if err != nil {
printError(os.Args[0], err)
return
}
fname := "upload.log"
var flog *os.File
if _, err = os.Stat(fname); os.IsNotExist(err) {
flog, err = os.Create(fname)
if err != nil {
printError(os.Args[0], err)
return
}
_, err = flog.WriteString(counter.CsvTitle())
if err != nil {
printError(os.Args[0], err)
return
}
} else {
flog, err = os.OpenFile(fname, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
printError(os.Args[0], err)
return
}
}
defer flog.Close()
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go postTimeout(buf, os.Args[4], duration, &wg, &counter)
}
ticker := time.NewTicker(10 * time.Second)
go func() {
for _ = range ticker.C {
log.Println(&counter)
flog.WriteString(counter.DiffCsv())
flog.Sync()
}
}()
wg.Wait()
flog.WriteString(counter.DiffCsv())
flog.Sync()
log.Println(&counter, ", Done.")
}
func formatBulk(lines []byte, bulkSize string, counter *Counter) (buf []byte, err error) {
number := bulkSize[:len(bulkSize)-1]
unit := bulkSize[len(bulkSize)-1]
s64, err := strconv.ParseInt(number, 10, 32)
if err != nil {
return
}
var bulksize int
switch unit {
case 'g':
bulksize = int(s64 * 1024 * 1024 * 1024)
case 'm':
bulksize = int(s64 * 1024 * 1024)
case 'k':
bulksize = int(s64 * 1024)
default:
s64, err = strconv.ParseInt(bulkSize, 10, 32)
if err != nil {
return
}
bulksize = int(s64)
}
var buffer bytes.Buffer
var lineCount, loopCount int
Outer:
for {
scanner := bufio.NewScanner(bytes.NewReader(lines))
for scanner.Scan() {
_, err = buffer.WriteString("{\"index\":{}}\n")
_, err = buffer.WriteString("{\"message\":\"")
if err != nil {
return
}
for _, c := range scanner.Bytes() {
if c > 0x1f {
if c == '"' {
_, err = buffer.WriteRune('\\')
_, err = buffer.WriteRune('"')
} else if c == '\\' {
_, err = buffer.WriteRune('\\')
_, err = buffer.WriteRune('\\')
} else {
err = buffer.WriteByte(c)
}
} else {
if c == '\t' {
_, err = buffer.WriteRune('\\')
_, err = buffer.WriteRune('t')
} else if c == '\n' || c == '\r' {
// delete \r\n
} else {
log.Println("Error line:\n", hex.Dump(scanner.Bytes()))
continue
}
}
}
_, err = buffer.WriteString("\"}\n")
if err != nil {
return
}
lineCount++
if buffer.Len() >= bulksize {
break Outer
}
}
loopCount++
}
counter.BytePerBuf = buffer.Len()
counter.LinePerBuf = lineCount
log.Printf("Created buffer of %vbytes (%v lines) in %v loop.\n", counter.BytePerBuf, counter.LinePerBuf, loopCount)
buf = buffer.Bytes()
return
}
func postTimeout(buf []byte, url string, testDuration time.Duration, wg *sync.WaitGroup, counter *Counter) {
defer wg.Done()
stop := false
go func() {
<-time.After(testDuration)
stop = true
}()
for !stop {
reader := bytes.NewReader(buf)
atomic.AddInt64(&counter.PostStartCount, 1)
smsg, err := postToEs(reader, url)
if err != nil {
atomic.AddInt64(&counter.ErrCount, 1)
log.Printf("Server responsed with error:\n%v\n%v", err, string(smsg))
return
} else {
atomic.AddInt64(&counter.PostEndCount, 1)
}
}
}
func postToEs(reader io.Reader, url string) (smsg []byte, err error) {
req, err := http.NewRequest("POST", url, reader)
if err != nil {
return
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
smsg, err = ioutil.ReadAll(resp.Body)
if err != nil {
return
}
searchLen := len(smsg)
if searchLen > 50 {
searchLen = 50
}
if bytes.Index(smsg[:50], []byte("\"errors\":true")) != -1 {
err = fmt.Errorf("Bulk failed because server responsed with error.")
return
}
smsg = nil
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment