Skip to content

Instantly share code, notes, and snippets.

@cannium
Last active July 31, 2019 15:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cannium/fa0e2d772e388cc48d9d17191075f752 to your computer and use it in GitHub Desktop.
Save cannium/fa0e2d772e388cc48d9d17191075f752 to your computer and use it in GitHub Desktop.
files to do performance test for seaweedfs-cannyls
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"math/rand"
"mime/multipart"
"net/http"
"os"
"time"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
)
const ObjectSizeLimit = 30 << 20 // 30M, limit introduced by cannyls
// Static alphaNumeric table used for generating unique request ids
var alphaNumericTable = []byte("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ")
func GenerateRandomId() []byte {
alpha := make([]byte, 16, 16)
for i := 0; i < 16; i++ {
n := rand.Intn(len(alphaNumericTable))
alpha[i] = alphaNumericTable[n]
}
return alpha
}
// read from ReadCloser and unmarshal to out;
// `out` should be of POINTER type
func ReadJsonBody(body io.ReadCloser, out interface{}) (err error) {
defer func() {
_ = body.Close()
}()
jsonBytes, err := ioutil.ReadAll(body)
if err != nil {
return err
}
//fmt.Println(string(jsonBytes))
err = json.Unmarshal(jsonBytes, out)
if err != nil {
return err
}
return nil
}
type uploadResult struct {
Name string `json:"name,omitempty"`
Size uint32 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
ETag string `json:"eTag,omitempty"`
}
// Storage implements yig.storage.backend
type Storage struct {
masters []string
seaweedClient *wdclient.MasterClient
httpClient *http.Client
}
func NewSeaweedStorage() Storage {
clientId := fmt.Sprintf("YIG-%s", string(GenerateRandomId()))
seaweedClient := wdclient.NewMasterClient(context.Background(),
grpc.WithInsecure(), clientId, []string{"10.254.128.51:2233"})
go seaweedClient.KeepConnectedToMaster()
seaweedClient.WaitUntilConnected() // FIXME some kind of timeout?
fmt.Println("Seaweedfs client initialized")
return Storage{
seaweedClient: seaweedClient,
httpClient: &http.Client{
Timeout: 5 * time.Minute,
Transport: &http.Transport{
MaxIdleConnsPerHost: 65535,
},
},
}
}
func (s Storage) assignObject(poolName string) (result operation.AssignResult, err error) {
masterAddress := s.seaweedClient.GetMaster()
assignRequest := &operation.VolumeAssignRequest{
// TODO read from config
Count: 1,
Replication: "000",
Collection: poolName,
Ttl: "",
DataCenter: "",
}
assignResult, err := operation.Assign(masterAddress, nil,
assignRequest)
if err != nil {
return operation.AssignResult{}, err
}
if assignResult.Error != "" {
return operation.AssignResult{}, errors.New(assignResult.Error)
}
return *assignResult, nil
}
func (s Storage) Put(poolName string, object io.Reader) (objectUrl string,
bytesWritten uint64, err error) {
assigned, err := s.assignObject(poolName)
if err != nil {
fmt.Println("assignObject error:", err)
return "", 0, err
}
url := fmt.Sprintf("http://%s/%s", assigned.Url, assigned.Fid)
// limit object size because of cannlys
object = io.LimitReader(object, ObjectSizeLimit)
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
part, err := writer.CreateFormFile("file", assigned.Fid)
if err != nil {
fmt.Println("CreateFormFile error:", err)
return "", 0, err
}
n, err := io.Copy(part, object)
if err != nil {
fmt.Println("io.Copy error:", err)
return "", 0, err
}
err = writer.Close()
if err != nil {
fmt.Println("writer.Close error:", err)
return "", 0, err
}
req, err := http.NewRequest("POST", url, body)
if err != nil {
fmt.Println("http.NewRequest error:", err)
return "", 0, err
}
req.Header.Set("Content-Type", writer.FormDataContentType())
resp, err := s.httpClient.Do(req)
if err != nil {
fmt.Println("s.httpClient.Do error:", err)
return "", 0, err
}
var result uploadResult
err = ReadJsonBody(resp.Body, &result)
if err != nil {
fmt.Println("ReadJsonBody error:", err)
return "", 0, err
}
if result.Error != "" {
return "", 0, errors.New(result.Error)
}
return url, uint64(n), nil
}
func (s Storage) GetReader(poolName, objectName string,
offset int64, length uint64) (reader io.ReadCloser, err error) {
// TODO offset and length
url, err := s.seaweedClient.LookupFileId(objectName)
if err != nil {
fmt.Println("seaweedClient.LookupFileId error:", err)
return nil, err
}
resp, err := s.httpClient.Get(url)
if err != nil {
fmt.Println("httpClient.Get error:", err)
return nil, err
}
return resp.Body, nil
}
func (s Storage) Remove(url string) (err error) {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
fmt.Println("http.NewRequest error:", err)
return err
}
resp, err := s.httpClient.Do(req)
if err != nil {
fmt.Println("httpClient.Get error:", err)
return err
}
var result map[string]interface{}
err = ReadJsonBody(resp.Body, &result)
if err != nil {
fmt.Println("ReadJsonBody error:", err)
return err
}
if resp.StatusCode == http.StatusAccepted {
return nil
}
return errors.New(fmt.Sprintln(result["error"]))
}
// randomly remove delete_ratio * len(urls)
func batchRemove(client Storage, urls []string, ratio float64) {
rand.Shuffle(len(urls), func(i, j int) {
urls[i], urls[j] = urls[j], urls[i]
})
urls = urls[:int(float64(len(urls))*ratio)]
for _, url := range urls {
client.Remove(url)
}
}
func main() {
file := flag.String("file", "", "file to upload")
concurrency := flag.Int("concurrency", 10, "go routine number")
deleteRatio := flag.Float64("delete_ratio", 0,
"delete objects randomly, ratio between 0 and 1")
flag.Parse()
f, err := os.Open(*file)
if err != nil {
fmt.Println("cannot open", *file)
return
}
content, err := ioutil.ReadAll(f)
if err != nil {
fmt.Println("ReadAll error:", err)
return
}
var size, count, lastSize, lastCount int64
countChannel := make(chan int64, *concurrency)
sizeChannel := make(chan int64, *concurrency)
urls := make([]string, 0, 10000)
var deleteChannel chan string
if *deleteRatio > 0 {
deleteChannel = make(chan string, *concurrency)
}
client := NewSeaweedStorage()
for i := 0; i < *concurrency; i++ {
go upload(content, client,
countChannel, sizeChannel, deleteChannel)
}
ticker := time.NewTicker(10 * time.Second)
lastTime := time.Now()
for {
select {
case s := <-sizeChannel:
size += s
case n := <-countChannel:
count += n
case t := <-ticker.C:
dt := t.Sub(lastTime)
ds := size - lastSize
dn := count - lastCount
fmt.Println("Objects per second:", float64(dn)/dt.Seconds(),
"Bytes per second:", float64(ds)/dt.Seconds())
lastSize, lastCount, lastTime = size, count, t
case url := <-deleteChannel:
urls = append(urls, url)
if len(urls) >= 10000 {
go batchRemove(client, urls, *deleteRatio)
urls = make([]string, 0, 10000)
}
}
}
}
func upload(content []byte, client Storage,
countChannel, sizeChannel chan int64,
deleteChannel chan string) {
reader := bytes.NewReader(content)
for {
reader.Seek(0, 0)
url, n, err := client.Put("", reader)
if err != nil {
fmt.Println("PUT", url, n, err)
} else {
countChannel <- 1
sizeChannel <- int64(n)
if deleteChannel != nil {
deleteChannel <- url
}
}
}
}
#!/bin/bash
set -x
bash stop.sh
ssh -t root@10.254.128.51 "cd /root/test; bash stop_all.sh"
ssh -t root@10.254.128.51 "cd /root/test; bash clean.sh"
for sizeFile in "1K" "100K" "30M"
#for sizeFile in "1K" "4K" "10K" "100K" "1M" "10M" "30M"
do
concurrency=100
if [ $sizeFile = "1K" ]; then
concurrency=1000
fi
if [ $sizeFile = "100K" ]; then
concurrency=10
fi
if [ $sizeFile = "30M" ]; then
concurrency=2
fi
for deleteRatio in "0" "0.05" "0.5"
do
#for deleteRatio in "0" "0.05" "0.1" "0.15" "0.2" "0.3" "0.5"
for version in "vanilla" "cannyls"
do
ssh -t root@10.254.128.51 "cd /root/test; cp weed.$version weed"
ssh 10.254.128.51 -l root "cd /root/test; bash start_master.bash"
sleep 5
ssh 10.254.128.51 -l root "cd /root/test; bash start_volume.bash"
sleep 5
curl 'http://10.254.128.51:2233/vol/grow?count=3500&replication=000'
sleep 60
for i in {1..10}
do
./pressure -file $sizeFile -concurrency $concurrency -delete_ratio $deleteRatio > $version/$sizeFile-$concurrency-$deleteRatio-$i.log &
done
sleep 1800 # 30min
bash stop.sh
ssh -t root@10.254.128.51 "cd /root/test; bash stop_all.sh"
sleep 60
ssh -t root@10.254.128.51 "cd /root/test; bash clean.sh"
done
done
done
nohup ./weed master -mdir=. -ip=10.254.128.51 -port=2233 -defaultReplication=000 &>> master.log &
# every dir is a mounted disk
./weed volume -dir=b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r -max=100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100 -mserver=127.0.0.1:2233 -ip=10.254.128.51 -port=2999 &>> volume1.log &
./weed volume -dir=s,t,u,v,w,x,y,z,aa,ab,ac,ad,ae,af,ag,ah,ai,aj -max=100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100 -mserver=10.254.128.51:2233 -ip=10.254.128.51 -port=3000 &>> volume2.log &
#!/usr/bin/python
# -*- coding: utf-8 -*-
# summary test results
import sys
import os
from parse import parse
# dir path -> [log file name]
def walk(dirName):
ans = []
for _, _, files in os.walk(dirName):
for f in files:
if f.endswith('.log'):
ans.append(f)
return ans
# dir name, file name -> [object size, concurrency, delete ratio, i,
# objects per sec, bytes per sec, errorCount]
def singleFileAverage(dirName, fileName):
result = parse('{}-{}-{}-{}.log', fileName)
objectSize, concurrency, deleteRatio, i = result
objects = 0
bytes = 0
n = 0
errors = {}
errorCount = 0
with open(dirName + '/' + fileName) as f:
for line in f:
result = parse('Objects per second: {} Bytes per second: {}', line)
if result is None:
if 'Seaweedfs client initialized' in line:
continue
errors[line] = None
errorCount += 1
continue
n += 1
objects += int(float(result[0]))
bytes += int(float(result[1]))
#for err in errors:
# print err,
return [
objectSize,
int(concurrency),
float(deleteRatio),
int(i),
1.0 * objects / n,
1.0 * bytes / n,
errorCount,
]
if __name__ == "__main__":
dirName = sys.argv[1]
files = walk(dirName)
# (object size, concurrency, delete ratio) ->
# [objects per sec, bytes per sec, error count]
memo = {}
for f in files:
entry = singleFileAverage(dirName, f)
key = (entry[0], entry[1], entry[2])
if key in memo:
memo[key] = [
memo[key][0] + entry[4],
memo[key][1] + entry[5],
memo[key][2] + entry[6],
]
else:
memo[key] = [entry[4], entry[5], entry[6]]
keys = memo.keys()
for k in sorted(keys):
print k, memo[k]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment