Instantly share code, notes, and snippets.

@cloverstd /qiniu.go
Last active Jul 21, 2017

Embed
What would you like to do?
Docker registry V2 qiniu storage,镜像仓库七牛驱动,https://hui.lu/docker-registry-storage/
package qiniu
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"time"
"strconv"
"github.com/Sirupsen/logrus"
dockerContext "github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
"qiniupkg.com/api.v7/conf"
"qiniupkg.com/api.v7/kodo"
"qiniupkg.com/api.v7/kodocli"
rpc "qiniupkg.com/x/rpc.v7"
)
const (
driverName = "qiniu"
maxChunkSize = 4 * (1 << 20)
listMax = 1000
)
type DriverParameters struct {
AccessKey string
SecretKey string
Bucket string
Domain string
RootDirectory string
}
func init() {
factory.Register(driverName, &qiniuDriverFactory{})
}
type qiniuDriverFactory struct{}
func (factory *qiniuDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
type driver struct {
client *kodo.Client
bucket *kodo.Bucket
logger *logrus.Logger
domain string
rootDirectory string
writerPath map[string]storagedriver.FileWriter
}
type baseEmbed struct {
base.Base
}
type Driver struct {
baseEmbed
}
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
params, err := fromParametersImpl(parameters)
if err != nil || params == nil {
return nil, err
}
return New(*params), nil
}
func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) {
params := &DriverParameters{}
if parameters != nil {
if accessKey, ok := parameters["accesskey"]; ok {
params.AccessKey = fmt.Sprint(accessKey)
} else {
return nil, fmt.Errorf("accesskey is required")
}
if secretKey, ok := parameters["secretkey"]; ok {
params.SecretKey = fmt.Sprint(secretKey)
} else {
return nil, fmt.Errorf("secretkey is required")
}
if bucket, ok := parameters["bucket"]; ok {
params.Bucket = fmt.Sprint(bucket)
} else {
return nil, fmt.Errorf("bucket is required")
}
if domain, ok := parameters["domain"]; ok {
params.Domain = fmt.Sprint(domain)
} else {
return nil, fmt.Errorf("domain is required")
}
if rootDirectory, ok := parameters["rootdirectory"]; ok {
if _, ok := rootDirectory.(string); ok {
params.RootDirectory = fmt.Sprint(rootDirectory)
} else {
params.RootDirectory = ""
}
} else {
params.RootDirectory = ""
}
}
return params, nil
}
func New(params DriverParameters) *Driver {
logger := logrus.New()
loggerFile, err := os.Create("test.log")
if err != nil {
logrus.Fatalln("open logger failed", err)
}
logger.Out = loggerFile
logger.Level = logrus.DebugLevel
conf.ACCESS_KEY = params.AccessKey
conf.SECRET_KEY = params.SecretKey
c := kodo.New(0, nil)
logger.Debugln(params.Bucket)
bucket := c.Bucket(params.Bucket)
qiniuDriver := &driver{
client: c,
bucket: &bucket,
logger: logger,
rootDirectory: params.RootDirectory,
domain: params.Domain,
writerPath: make(map[string]storagedriver.FileWriter),
}
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: qiniuDriver,
},
},
}
}
func (d *driver) Name() string {
return driverName
}
func (d *driver) GetContent(ctx dockerContext.Context, path string) ([]byte, error) {
d.logger.WithFields(logrus.Fields{
"path": path,
}).Debugln("GetContent")
baseURL := kodo.MakeBaseUrl(d.domain, d.qiniuPath(path))
policy := kodo.GetPolicy{}
privateURL := d.client.MakePrivateUrl(baseURL, &policy)
resp, err := http.Get(privateURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return ioutil.ReadAll(resp.Body)
}
func (d *driver) PutContent(ctx dockerContext.Context, path string, contents []byte) error {
d.logger.WithFields(logrus.Fields{
"path": path,
"size": len(contents),
}).Debugln("PutContent")
putExtra := kodo.PutExtra{}
body := bytes.NewReader(contents)
err := d.bucket.Put(context.Background(), nil, d.qiniuPath(path), body, int64(len(contents)), &putExtra)
if err != nil {
d.logger.WithFields(logrus.Fields{
"error": err,
"path": path,
"contents": len(contents),
}).Errorln("PutContent failed")
}
return err
}
func (d *driver) Reader(ctx dockerContext.Context, path string, offset int64) (io.ReadCloser, error) {
d.logger.WithFields(logrus.Fields{
"path": path,
"offset": offset,
}).Debugln("Reader")
contents, err := d.GetContent(ctx, path)
if err != nil {
return nil, err
}
return ioutil.NopCloser(bytes.NewReader(contents[offset:])), nil
}
func (d *driver) Writer(ctx dockerContext.Context, path string, append bool) (storagedriver.FileWriter, error) {
d.logger.WithFields(logrus.Fields{
"path": path,
"append": append,
}).Debugln("Writer")
var w *writer
qiniuPath := d.qiniuPath(path)
if !append {
w = &writer{
key: qiniuPath,
driver: d,
buffer: make([]byte, 0, maxChunkSize),
uploadCtxList: make([]string, 0, 1),
}
d.writerPath[qiniuPath] = w
} else {
w = d.writerPath[qiniuPath].(*writer)
w.closed = false
}
return w, nil
}
func (d *driver) Stat(ctx dockerContext.Context, path string) (storagedriver.FileInfo, error) {
d.logger.WithFields(logrus.Fields{
"path": path,
}).Debugln("Stat")
entries, commonPrefixes, _, err := d.bucket.List(context.Background(), d.qiniuPath(path), "/", "", 1)
if err != nil && len(entries) == 0 {
if err == io.EOF {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return nil, err
}
fi := storagedriver.FileInfoFields{
Path: path,
}
if len(entries) == 1 {
if entries[0].Key != d.qiniuPath(path) {
fi.IsDir = true
} else {
fi.IsDir = false
fi.Size = entries[0].Fsize
fi.ModTime = time.Unix(0, entries[0].PutTime)
}
} else if len(commonPrefixes) == 1 {
fi.IsDir = true
} else {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
func (d *driver) List(ctx dockerContext.Context, opath string) ([]string, error) {
d.logger.WithFields(logrus.Fields{
"path": opath,
}).Debugln("List")
path := opath
if path != "/" && opath[len(path)-1] != '/' {
path = path + "/"
}
prefix := ""
if d.qiniuPath("") == "" {
prefix = "/"
}
qiniuPath := d.qiniuPath(path)
entries, commonPrefixes, marker, err := d.bucket.List(context.Background(), qiniuPath, "/", "", listMax)
if err != nil && len(entries) == 0 && len(commonPrefixes) == 0 {
if err == io.EOF {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return nil, err
}
files := []string{}
directories := []string{}
for {
for _, key := range entries {
files = append(files, strings.Replace(key.Key, d.qiniuPath(""), prefix, 1))
}
for _, commonPrefix := range commonPrefixes {
directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.qiniuPath(""), prefix, 1))
}
if err == io.EOF {
break
}
entries, commonPrefixes, _, err = d.bucket.List(context.Background(), qiniuPath, "/", marker, listMax)
if err != nil && len(entries) == 0 {
return nil, err
}
}
if len(files) > 0 && files[0] == strings.Replace(qiniuPath, d.qiniuPath(""), prefix, 1) {
files = files[1:]
}
if opath != "/" {
if len(files) == 0 && len(directories) == 0 {
return nil, storagedriver.PathNotFoundError{Path: opath}
}
}
return append(files, directories...), nil
}
func (d *driver) Move(ctx dockerContext.Context, sourcePath string, destPath string) error {
d.logger.WithFields(logrus.Fields{
"source_path": sourcePath,
"dest_path": destPath,
}).Debugln("Move")
if err := d.bucket.Move(context.Background(), d.qiniuPath(sourcePath), d.qiniuPath(destPath)); err != nil {
return err
}
return nil
}
func (d *driver) Delete(ctx dockerContext.Context, path string) error {
d.logger.WithFields(logrus.Fields{
"path": path,
}).Debugln("Delete")
keys := make([]string, 0, listMax)
if err := d.treePath(path, &keys, ""); err != nil {
return err
}
if _, err := d.bucket.BatchDelete(context.Background(), keys...); err != nil {
return err
}
return nil
}
func (d *driver) URLFor(ctx dockerContext.Context, path string, options map[string]interface{}) (string, error) {
d.logger.WithFields(logrus.Fields{
"path": path,
}).Debugln("URLFor")
methodString := "GET"
method, ok := options["method"]
if ok {
methodString, ok = method.(string)
if !ok || (methodString != "GET") {
return "", storagedriver.ErrUnsupportedMethod{}
}
}
expiresTime := time.Now().Add(20 * time.Minute)
expires, ok := options["expiry"]
if ok {
et, ok := expires.(time.Time)
if ok {
expiresTime = et
}
}
logrus.Infof("methodString: %s, expiresTime: %v", methodString, expiresTime)
baseURL := kodo.MakeBaseUrl(d.domain, d.qiniuPath(path))
policy := kodo.GetPolicy{
Expires: uint32(expiresTime.Unix()),
}
privateURL := d.client.MakePrivateUrl(baseURL, &policy)
logrus.Infof("private URL: %s", privateURL)
return privateURL, nil
}
func (d *driver) RemoveWriter(key string) {
delete(d.writerPath, key)
}
func (d *driver) qiniuPath(path string) string {
return strings.TrimLeft(strings.TrimRight(d.rootDirectory, "/")+path, "/")
}
func (d *driver) treePath(path string, keys *[]string, markerOut string) error {
entries, commonPrefixes, markerOut, err := d.bucket.List(context.Background(), d.qiniuPath(path), "/", markerOut, listMax)
if err != nil && len(entries) == 0 && len(commonPrefixes) == 0 {
if err == io.EOF {
return nil
}
return err
}
for _, key := range entries {
*keys = append(*keys, key.Key)
}
for _, commonPrefix := range commonPrefixes {
if err := d.treePath(commonPrefix, keys, markerOut); err != nil {
return err
}
}
if err != io.EOF {
if err := d.treePath(path, keys, markerOut); err != nil {
return err
}
}
return nil
}
type writer struct {
driver *driver
key string
size int64
closed bool
committed bool
cancelled bool
buffer []byte
uploadCtxList []string
}
func (w *writer) Write(p []byte) (int, error) {
// w.driver.logger.WithFields(logrus.Fields{
// "key": w.key,
// }).Debugln("Write")
if w.closed {
return 0, fmt.Errorf("already closed")
} else if w.committed {
return 0, fmt.Errorf("already committed")
} else if w.cancelled {
return 0, fmt.Errorf("already cancelled")
}
if err := w.flushBuffer(); err != nil {
return 0, err
}
w.buffer = append(w.buffer, p...)
if len(w.buffer) >= maxChunkSize {
if err := w.flushBuffer(); err != nil {
return 0, err
}
}
w.size += int64(len(p))
return len(p), nil
}
func (w *writer) Size() int64 {
return w.size
}
func (w *writer) Close() error {
w.driver.logger.WithFields(logrus.Fields{
"key": w.key,
"size": w.size,
"buffer": len(w.buffer),
}).Debugln("Close")
if w.closed {
return fmt.Errorf("already closed")
}
if err := w.flushBuffer(); err != nil {
return err
}
w.closed = true
return nil
}
func (w *writer) Cancel() error {
w.driver.logger.WithFields(logrus.Fields{
"key": w.key,
"size": w.size,
"buffer": len(w.buffer),
}).Debugln("Cancel")
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
}
w.cancelled = true
return nil
}
func (w *writer) Commit() error {
defer func() {
w.driver.RemoveWriter(w.key)
}()
w.driver.logger.WithFields(logrus.Fields{
"key": w.key,
"size": w.size,
"buffer": len(w.buffer),
}).Debugln("Commit")
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
} else if w.cancelled {
return fmt.Errorf("already cancelled")
}
if err := w.flushBuffer(); err != nil {
return err
}
if len(w.buffer) > 0 {
if err := w.mkblk(bytes.NewReader(w.buffer), len(w.buffer)); err != nil {
return err
}
}
if err := w.mkfile(); err != nil {
return err
}
w.committed = true
return nil
}
func (w *writer) flushBuffer() error {
for len(w.buffer) >= maxChunkSize {
if err := w.mkblk(bytes.NewReader(w.buffer[:maxChunkSize]), maxChunkSize); err != nil {
return err
}
w.buffer = w.buffer[maxChunkSize:]
}
return nil
}
func (w *writer) mkblk(blob io.Reader, blobSize int) error {
url := fmt.Sprintf("%s/mkblk/%d", w.driver.client.UpHosts[0], blobSize)
resp, err := w.post(url, blob, blobSize)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
res := kodocli.BlkputRet{}
err := json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
return err
}
w.uploadCtxList = append(w.uploadCtxList, res.Ctx)
return nil
}
if err := rpc.ResponseError(resp); err != nil {
w.driver.logger.WithError(err).Errorln("mkblk failed")
return err
}
return nil
}
func (w *writer) mkfile() error {
url := fmt.Sprintf("%s/mkfile/%d/key/%s", w.driver.client.UpHosts[0], w.size, encode(w.key))
buf := make([]byte, 0, 176*len(w.uploadCtxList))
for _, ctx := range w.uploadCtxList {
buf = append(buf, ctx...)
buf = append(buf, ',')
}
if len(buf) > 0 {
buf = buf[:len(buf)-1]
}
resp, err := w.post(url, bytes.NewReader(buf), len(buf))
if err != nil {
return err
}
if resp.StatusCode == 200 {
return nil
}
defer resp.Body.Close()
if err := rpc.ResponseError(resp); err != nil {
w.driver.logger.WithFields(logrus.Fields{
"error": err,
"size": w.size,
"url": url,
}).Errorln("mkfile failed")
return err
}
return nil
}
func (w *writer) post(url string, blob io.Reader, blobSize int) (*http.Response, error) {
resp, err := func() (*http.Response, error) {
req, err := http.NewRequest("POST", url, blob)
if err != nil {
return nil, err
}
policy := kodo.PutPolicy{
Scope: w.driver.bucket.Name,
Expires: 3600,
UpHosts: w.driver.bucket.UpHosts,
}
token, err := w.driver.client.MakeUptokenWithSafe(&policy)
if err != nil {
return nil, err
}
req.Header.Set(http.CanonicalHeaderKey("Host"), w.driver.client.UpHosts[0])
req.Header.Set(http.CanonicalHeaderKey("Content-Type"), "application/octet-stream")
req.Header.Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(blobSize))
req.Header.Set(http.CanonicalHeaderKey("Authorization"), fmt.Sprintf("UpToken %s", token))
client := http.Client{}
return client.Do(req)
}()
if err != nil {
w.driver.logger.WithFields(logrus.Fields{
"url": url,
"error": err,
}).Errorln("post failed")
}
return resp, err
}
func encode(raw string) string {
return base64.URLEncoding.EncodeToString([]byte(raw))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment