Skip to content

Instantly share code, notes, and snippets.

@nullne nullne/minio.go
Created Jan 16, 2019

Embed
What would you like to do?
package main
import (
"bytes"
"context"
"crypto/md5"
"errors"
"flag"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"os/signal"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
"github.com/golang/glog"
minio "github.com/minio/minio-go"
"github.com/minio/minio/pkg/sys"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
_ "net/http/pprof"
)
var (
fWrite = flag.Int("w", 10, "routine number for writing to one bucket")
fRead = flag.Int("r", 10, "routine number for reading")
fOnlyRead = flag.Bool("only-read", false, "will write at the begging though")
fSize = flag.Int("size", 1024*64, "unit is byte")
fMemory = flag.Int("mem", 64, "unit is G")
fDrives = flag.Int("drive", 12, "drive numbers in each set")
freshRatio = flag.Float64("fresh-ratio", 0.3, "")
fWriteTimeout = flag.Duration("write-timeout", time.Second*5, "")
fReadTimeout = flag.Duration("read-timeout", time.Second*5, "")
fEndpoint = flag.String("endpoint", "minio-rr:9000", "")
fLocation = flag.String("location", "Beijing", "")
fBuckets = flag.String("buckets", "snoopy", "muliple buckets should be separated by comma")
)
var (
ctrOperationTimes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "minio",
Name: "operation_times",
Help: "operation times",
},
[]string{"type", "success"},
)
hgReadDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "minio",
Name: "read_duration",
Help: "read duration",
Buckets: []float64{.001, .003, .005, .1, .5, 1, 2, 3, 4},
})
hgWriteDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "minio",
Name: "write_duration",
Help: "write duration",
Buckets: []float64{.001, .003, .005, .1, .5, 1, 2, 3, 4},
})
)
func validFlags() error {
glog.V(1).Infof("routine number to write is %d, to read is %d (only read is %v)", *fWrite, *fRead, *fOnlyRead)
glog.V(1).Infof("write timeout is %v, read is %v", *fWriteTimeout, *fReadTimeout)
glog.V(1).Infof("node memory is %vG, drive numbers is %v, file size is %v, fresh ratios is %0.2f", *fMemory, *fDrives, *fSize, *freshRatio)
glog.V(1).Infof("endpoint is %v, location is %v, buckets are %v", *fEndpoint, *fLocation, *fBuckets)
return nil
}
func main() {
initPrometheus()
buckets := strings.Split(*fBuckets, ",")
if err := initBuckets(*fEndpoint, buckets, *fLocation); err != nil {
glog.Exit(err)
}
op := newObjectPool(*fMemory*1024*1024*1024, *fSize, *fDrives, *freshRatio)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wctx, wcancel := context.WithCancel(ctx)
for j, bucket := range buckets {
for i := 0; i < *fWrite; i++ {
wg.Add(1)
go writeWorker(wctx, &wg, op, i+j, bucket)
}
}
if *fOnlyRead {
op.waitUntilFull(ctx)
wcancel()
}
for i := 0; i < *fRead; i++ {
wg.Add(1)
go readWorker(ctx, &wg, op, i)
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
{
for {
select {
case <-signalCh:
cancel()
glog.Info("will exist by signal, wait all routines finished")
wg.Wait()
return
}
}
}
}
type object struct {
bucket string
key string
md5sum string
}
type objectPool struct {
objects []*object
lock sync.RWMutex
freshSize int
maxSize int
}
func newObjectPool(memory, size, drives int, freshRatio float64) *objectPool {
op := objectPool{
// roughly calculated
maxSize: memory / (2 * size / drives),
}
op.freshSize = int(float64(op.maxSize) * freshRatio)
return &op
}
func (o *objectPool) waitUntilFull(ctx context.Context) {
ch := make(chan struct{})
go func() {
defer close(ch)
for {
select {
case <-ctx.Done():
return
case <-time.Tick(5 * time.Second):
o.lock.Lock()
length := len(o.objects)
o.lock.Unlock()
glog.V(3).Infof("object pool size cur: %d, max: %d", length, o.maxSize)
if length == o.maxSize {
return
}
}
}
}()
<-ch
glog.V(1).Info("the object pool is full")
}
func (o *objectPool) put(bucket, key, md5sum string) {
o.lock.Lock()
defer o.lock.Unlock()
if len(o.objects) < o.maxSize {
o.objects = append(o.objects, &object{bucket: bucket, key: key, md5sum: md5sum})
} else {
idx := rand.Intn(o.freshSize)
o.objects[idx] = &object{bucket: bucket, key: key, md5sum: md5sum}
}
}
func (o *objectPool) get() *object {
o.lock.RLock()
defer o.lock.RUnlock()
if len(o.objects) == 0 {
return nil
}
idx := rand.Intn(len(o.objects))
return o.objects[idx]
}
func sampleFile(size int) *bytes.Reader {
content := make([]byte, size)
for i := 0; i < size; i += (size / 1024) {
content[i] = byte(rand.Intn(10))
}
return bytes.NewReader(content)
}
func readWorker(ctx context.Context, wg *sync.WaitGroup, op *objectPool, number int) {
glog.V(4).Infof("read worker %d starts", number)
defer glog.V(4).Infof("read worker %d exits", number)
client, err := newClient(*fEndpoint)
if err != nil {
glog.Errorf("failed to created client", err)
}
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
func() {
object := op.get()
if object == nil {
time.Sleep(time.Second)
return
}
h := md5.New()
now := time.Now()
ctx2, cancel := context.WithTimeout(context.Background(), *fReadTimeout)
obj, err := client.GetObjectWithContext(ctx2, object.bucket, object.key, minio.GetObjectOptions{})
if err != nil {
ctrOperationTimes.With(prometheus.Labels{"type": "read", "success": "failed"}).Inc()
glog.Errorf("get object %s in bucket %s failed: %v", object.key, object.bucket, err)
return
}
defer obj.Close()
_, err = io.Copy(h, obj)
if err != nil {
ctrOperationTimes.With(prometheus.Labels{"type": "read", "success": "failed"}).Inc()
glog.Errorf("copy object %s in bucket %s failed: %v", object.key, object.bucket, err)
return
}
cancel()
hgReadDuration.Observe(float64(time.Since(now).Nanoseconds()) / float64(time.Second))
if md5sum := fmt.Sprintf("%x", h.Sum(nil)); md5sum != object.md5sum {
ctrOperationTimes.With(prometheus.Labels{"type": "read", "success": "failed"}).Inc()
glog.Errorf("md5 of object %s in bucket %s mismatch, wanna: %s, got %s", object.key, object.bucket, object.md5sum, md5sum)
return
}
ctrOperationTimes.With(prometheus.Labels{"type": "read", "success": "success"}).Inc()
}()
}
}
func writeWorker(ctx context.Context, wg *sync.WaitGroup, op *objectPool, number int, bucket string) {
glog.V(4).Infof("write worker %d starts", number)
defer glog.V(4).Infof("write worker %d exits", number)
defer wg.Done()
client, err := newClient(*fEndpoint)
if err != nil {
glog.Errorf("failed to created client", err)
}
for {
select {
case <-ctx.Done():
return
default:
}
id, err := getMediaIdentifier()
if err != nil {
glog.Errorf("failed to get identifier: %v", err)
continue
}
objectName := fmt.Sprintf("%s/%s/%s", id[0:2], id[2:4], id)
reader := sampleFile(*fSize)
// md5
h := md5.New()
_, err = io.Copy(h, reader)
if err != nil {
glog.Errorf("failed to calculate md5")
continue
}
md5sum := fmt.Sprintf("%x", h.Sum(nil))
now := time.Now()
// upload
reader.Seek(0, 0)
ctx2, cancel := context.WithTimeout(context.Background(), *fWriteTimeout)
size, err := client.PutObjectWithContext(ctx2, bucket, objectName, reader, reader.Size(), minio.PutObjectOptions{})
if err != nil {
ctrOperationTimes.With(prometheus.Labels{"type": "write", "success": "failed"}).Inc()
glog.Errorf("put object %s in bucket %s failed: %v", objectName, bucket, err)
continue
}
cancel()
hgWriteDuration.Observe(float64(time.Since(now).Nanoseconds()) / float64(time.Second))
// upload successful
op.put(bucket, objectName, md5sum)
ctrOperationTimes.With(prometheus.Labels{"type": "write", "success": "success"}).Inc()
glog.V(5).Infof("successfully upload file %s with size %d to %s", objectName, size, bucket)
}
}
func getMediaIdentifier() (string, error) {
alphabet := "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" // Based on RFC 4648 Base32 alphabet
imageName := ""
bs := make([]byte, 30)
_, err := rand.Read(bs)
if err != nil {
return "", err
}
for _, b := range bs {
imageName += string(alphabet[(int(b) % len(alphabet))])
}
return imageName, nil
}
func initBuckets(endpoint string, buckets []string, location string) error {
accessKeyID := os.Getenv("MINIO_ACCESS_KEY")
secretAccessKey := os.Getenv("MINIO_SECRET_KEY")
if accessKeyID == "" || secretAccessKey == "" {
return errors.New("should export MINIO_ACCESS_KEY and MINIO_SECRET_KEY first")
}
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, false)
if err != nil {
return err
}
for _, bucketName := range buckets {
err = minioClient.MakeBucket(bucketName, location)
if err != nil {
// Check to see if we already own this bucket (which happens if you run this twice)
exists, err := minioClient.BucketExists(bucketName)
if err == nil && exists {
glog.Infof("We already own %s", bucketName)
} else {
return err
}
} else {
glog.Infof("Successfully created %s", bucketName)
}
}
return nil
}
func newClient(endpoint string) (*minio.Client, error) {
accessKeyID := os.Getenv("MINIO_ACCESS_KEY")
secretAccessKey := os.Getenv("MINIO_SECRET_KEY")
if accessKeyID == "" || secretAccessKey == "" {
return nil, errors.New("should export MINIO_ACCESS_KEY and MINIO_SECRET_KEY first")
}
return minio.New(endpoint, accessKeyID, secretAccessKey, false)
}
func initClientNBuckets(endpoint string, buckets []string, location string) (*minio.Client, error) {
accessKeyID := os.Getenv("MINIO_ACCESS_KEY")
secretAccessKey := os.Getenv("MINIO_SECRET_KEY")
if accessKeyID == "" || secretAccessKey == "" {
return nil, errors.New("should export MINIO_ACCESS_KEY and MINIO_SECRET_KEY first")
}
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, false)
if err != nil {
return nil, err
}
for _, bucketName := range buckets {
err = minioClient.MakeBucket(bucketName, location)
if err != nil {
// Check to see if we already own this bucket (which happens if you run this twice)
exists, err := minioClient.BucketExists(bucketName)
if err == nil && exists {
glog.Infof("We already own %s", bucketName)
} else {
return nil, err
}
} else {
glog.Infof("Successfully created %s", bucketName)
}
}
return minioClient, nil
}
func setMaxResources() (err error) {
// Set the Go runtime max threads threshold to 90% of kernel setting.
// Do not return when an error when encountered since it is not a crucial task.
sysMaxThreads, mErr := sys.GetMaxThreads()
if mErr == nil {
minioMaxThreads := (sysMaxThreads * 90) / 100
// Only set max threads if it is greater than the default one
if minioMaxThreads > 10000 {
debug.SetMaxThreads(minioMaxThreads)
}
}
var maxLimit uint64
// Set open files limit to maximum.
if _, maxLimit, err = sys.GetMaxOpenFileLimit(); err != nil {
return err
}
if err = sys.SetMaxOpenFileLimit(maxLimit, maxLimit); err != nil {
return err
}
// Set max memory limit as current memory limit.
if _, maxLimit, err = sys.GetMaxMemoryLimit(); err != nil {
return err
}
err = sys.SetMaxMemoryLimit(maxLimit, maxLimit)
return err
}
func initPrometheus() {
prometheus.MustRegister(ctrOperationTimes)
prometheus.MustRegister(hgReadDuration)
prometheus.MustRegister(hgWriteDuration)
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":2112", nil)
}
func init() {
flag.Parse()
if err := validFlags(); err != nil {
glog.Fatal(err)
}
fmt.Println(*fOnlyRead)
rand.Seed(int64(time.Now().Nanosecond()))
if err := setMaxResources(); err != nil {
glog.Warningf("failed to set max resources", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.