Last active
July 21, 2017 08:07
-
-
Save cloverstd/813a0df6b9eacc459dd84509e902e744 to your computer and use it in GitHub Desktop.
Docker registry V2 qiniu storage,镜像仓库七牛驱动,https://hui.lu/docker-registry-storage/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package qiniu | |
import ( | |
"bytes" | |
"context" | |
"encoding/base64" | |
"encoding/json" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"strings" | |
"time" | |
"strconv" | |
"github.com/Sirupsen/logrus" | |
dockerContext "github.com/docker/distribution/context" | |
storagedriver "github.com/docker/distribution/registry/storage/driver" | |
"github.com/docker/distribution/registry/storage/driver/base" | |
"github.com/docker/distribution/registry/storage/driver/factory" | |
"qiniupkg.com/api.v7/conf" | |
"qiniupkg.com/api.v7/kodo" | |
"qiniupkg.com/api.v7/kodocli" | |
rpc "qiniupkg.com/x/rpc.v7" | |
) | |
const ( | |
driverName = "qiniu" | |
maxChunkSize = 4 * (1 << 20) | |
listMax = 1000 | |
) | |
type DriverParameters struct { | |
AccessKey string | |
SecretKey string | |
Bucket string | |
Domain string | |
RootDirectory string | |
} | |
func init() { | |
factory.Register(driverName, &qiniuDriverFactory{}) | |
} | |
type qiniuDriverFactory struct{} | |
func (factory *qiniuDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { | |
return FromParameters(parameters) | |
} | |
type driver struct { | |
client *kodo.Client | |
bucket *kodo.Bucket | |
logger *logrus.Logger | |
domain string | |
rootDirectory string | |
writerPath map[string]storagedriver.FileWriter | |
} | |
type baseEmbed struct { | |
base.Base | |
} | |
type Driver struct { | |
baseEmbed | |
} | |
func FromParameters(parameters map[string]interface{}) (*Driver, error) { | |
params, err := fromParametersImpl(parameters) | |
if err != nil || params == nil { | |
return nil, err | |
} | |
return New(*params), nil | |
} | |
func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) { | |
params := &DriverParameters{} | |
if parameters != nil { | |
if accessKey, ok := parameters["accesskey"]; ok { | |
params.AccessKey = fmt.Sprint(accessKey) | |
} else { | |
return nil, fmt.Errorf("accesskey is required") | |
} | |
if secretKey, ok := parameters["secretkey"]; ok { | |
params.SecretKey = fmt.Sprint(secretKey) | |
} else { | |
return nil, fmt.Errorf("secretkey is required") | |
} | |
if bucket, ok := parameters["bucket"]; ok { | |
params.Bucket = fmt.Sprint(bucket) | |
} else { | |
return nil, fmt.Errorf("bucket is required") | |
} | |
if domain, ok := parameters["domain"]; ok { | |
params.Domain = fmt.Sprint(domain) | |
} else { | |
return nil, fmt.Errorf("domain is required") | |
} | |
if rootDirectory, ok := parameters["rootdirectory"]; ok { | |
if _, ok := rootDirectory.(string); ok { | |
params.RootDirectory = fmt.Sprint(rootDirectory) | |
} else { | |
params.RootDirectory = "" | |
} | |
} else { | |
params.RootDirectory = "" | |
} | |
} | |
return params, nil | |
} | |
func New(params DriverParameters) *Driver { | |
logger := logrus.New() | |
loggerFile, err := os.Create("test.log") | |
if err != nil { | |
logrus.Fatalln("open logger failed", err) | |
} | |
logger.Out = loggerFile | |
logger.Level = logrus.DebugLevel | |
conf.ACCESS_KEY = params.AccessKey | |
conf.SECRET_KEY = params.SecretKey | |
c := kodo.New(0, nil) | |
logger.Debugln(params.Bucket) | |
bucket := c.Bucket(params.Bucket) | |
qiniuDriver := &driver{ | |
client: c, | |
bucket: &bucket, | |
logger: logger, | |
rootDirectory: params.RootDirectory, | |
domain: params.Domain, | |
writerPath: make(map[string]storagedriver.FileWriter), | |
} | |
return &Driver{ | |
baseEmbed: baseEmbed{ | |
Base: base.Base{ | |
StorageDriver: qiniuDriver, | |
}, | |
}, | |
} | |
} | |
func (d *driver) Name() string { | |
return driverName | |
} | |
func (d *driver) GetContent(ctx dockerContext.Context, path string) ([]byte, error) { | |
d.logger.WithFields(logrus.Fields{ | |
"path": path, | |
}).Debugln("GetContent") | |
baseURL := kodo.MakeBaseUrl(d.domain, d.qiniuPath(path)) | |
policy := kodo.GetPolicy{} | |
privateURL := d.client.MakePrivateUrl(baseURL, &policy) | |
resp, err := http.Get(privateURL) | |
if err != nil { | |
return nil, err | |
} | |
defer resp.Body.Close() | |
if resp.StatusCode != 200 { | |
return nil, storagedriver.PathNotFoundError{Path: path} | |
} | |
return ioutil.ReadAll(resp.Body) | |
} | |
func (d *driver) PutContent(ctx dockerContext.Context, path string, contents []byte) error { | |
d.logger.WithFields(logrus.Fields{ | |
"path": path, | |
"size": len(contents), | |
}).Debugln("PutContent") | |
putExtra := kodo.PutExtra{} | |
body := bytes.NewReader(contents) | |
err := d.bucket.Put(context.Background(), nil, d.qiniuPath(path), body, int64(len(contents)), &putExtra) | |
if err != nil { | |
d.logger.WithFields(logrus.Fields{ | |
"error": err, | |
"path": path, | |
"contents": len(contents), | |
}).Errorln("PutContent failed") | |
} | |
return err | |
} | |
func (d *driver) Reader(ctx dockerContext.Context, path string, offset int64) (io.ReadCloser, error) { | |
d.logger.WithFields(logrus.Fields{ | |
"path": path, | |
"offset": offset, | |
}).Debugln("Reader") | |
contents, err := d.GetContent(ctx, path) | |
if err != nil { | |
return nil, err | |
} | |
return ioutil.NopCloser(bytes.NewReader(contents[offset:])), nil | |
} | |
func (d *driver) Writer(ctx dockerContext.Context, path string, append bool) (storagedriver.FileWriter, error) { | |
d.logger.WithFields(logrus.Fields{ | |
"path": path, | |
"append": append, | |
}).Debugln("Writer") | |
var w *writer | |
qiniuPath := d.qiniuPath(path) | |
if !append { | |
w = &writer{ | |
key: qiniuPath, | |
driver: d, | |
buffer: make([]byte, 0, maxChunkSize), | |
uploadCtxList: make([]string, 0, 1), | |
} | |
d.writerPath[qiniuPath] = w | |
} else { | |
w = d.writerPath[qiniuPath].(*writer) | |
w.closed = false | |
} | |
return w, nil | |
} | |
func (d *driver) Stat(ctx dockerContext.Context, path string) (storagedriver.FileInfo, error) { | |
d.logger.WithFields(logrus.Fields{ | |
"path": path, | |
}).Debugln("Stat") | |
entries, commonPrefixes, _, err := d.bucket.List(context.Background(), d.qiniuPath(path), "/", "", 1) | |
if err != nil && len(entries) == 0 { | |
if err == io.EOF { | |
return nil, storagedriver.PathNotFoundError{Path: path} | |
} | |
return nil, err | |
} | |
fi := storagedriver.FileInfoFields{ | |
Path: path, | |
} | |
if len(entries) == 1 { | |
if entries[0].Key != d.qiniuPath(path) { | |
fi.IsDir = true | |
} else { | |
fi.IsDir = false | |
fi.Size = entries[0].Fsize | |
fi.ModTime = time.Unix(0, entries[0].PutTime) | |
} | |
} else if len(commonPrefixes) == 1 { | |
fi.IsDir = true | |
} else { | |
return nil, storagedriver.PathNotFoundError{Path: path} | |
} | |
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil | |
} | |
func (d *driver) List(ctx dockerContext.Context, opath string) ([]string, error) { | |
d.logger.WithFields(logrus.Fields{ | |
"path": opath, | |
}).Debugln("List") | |
path := opath | |
if path != "/" && opath[len(path)-1] != '/' { | |
path = path + "/" | |
} | |
prefix := "" | |
if d.qiniuPath("") == "" { | |
prefix = "/" | |
} | |
qiniuPath := d.qiniuPath(path) | |
entries, commonPrefixes, marker, err := d.bucket.List(context.Background(), qiniuPath, "/", "", listMax) | |
if err != nil && len(entries) == 0 && len(commonPrefixes) == 0 { | |
if err == io.EOF { | |
return nil, storagedriver.PathNotFoundError{Path: path} | |
} | |
return nil, err | |
} | |
files := []string{} | |
directories := []string{} | |
for { | |
for _, key := range entries { | |
files = append(files, strings.Replace(key.Key, d.qiniuPath(""), prefix, 1)) | |
} | |
for _, commonPrefix := range commonPrefixes { | |
directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.qiniuPath(""), prefix, 1)) | |
} | |
if err == io.EOF { | |
break | |
} | |
entries, commonPrefixes, _, err = d.bucket.List(context.Background(), qiniuPath, "/", marker, listMax) | |
if err != nil && len(entries) == 0 { | |
return nil, err | |
} | |
} | |
if len(files) > 0 && files[0] == strings.Replace(qiniuPath, d.qiniuPath(""), prefix, 1) { | |
files = files[1:] | |
} | |
if opath != "/" { | |
if len(files) == 0 && len(directories) == 0 { | |
return nil, storagedriver.PathNotFoundError{Path: opath} | |
} | |
} | |
return append(files, directories...), nil | |
} | |
func (d *driver) Move(ctx dockerContext.Context, sourcePath string, destPath string) error { | |
d.logger.WithFields(logrus.Fields{ | |
"source_path": sourcePath, | |
"dest_path": destPath, | |
}).Debugln("Move") | |
if err := d.bucket.Move(context.Background(), d.qiniuPath(sourcePath), d.qiniuPath(destPath)); err != nil { | |
return err | |
} | |
return nil | |
} | |
func (d *driver) Delete(ctx dockerContext.Context, path string) error { | |
d.logger.WithFields(logrus.Fields{ | |
"path": path, | |
}).Debugln("Delete") | |
keys := make([]string, 0, listMax) | |
if err := d.treePath(path, &keys, ""); err != nil { | |
return err | |
} | |
if _, err := d.bucket.BatchDelete(context.Background(), keys...); err != nil { | |
return err | |
} | |
return nil | |
} | |
func (d *driver) URLFor(ctx dockerContext.Context, path string, options map[string]interface{}) (string, error) { | |
d.logger.WithFields(logrus.Fields{ | |
"path": path, | |
}).Debugln("URLFor") | |
methodString := "GET" | |
method, ok := options["method"] | |
if ok { | |
methodString, ok = method.(string) | |
if !ok || (methodString != "GET") { | |
return "", storagedriver.ErrUnsupportedMethod{} | |
} | |
} | |
expiresTime := time.Now().Add(20 * time.Minute) | |
expires, ok := options["expiry"] | |
if ok { | |
et, ok := expires.(time.Time) | |
if ok { | |
expiresTime = et | |
} | |
} | |
logrus.Infof("methodString: %s, expiresTime: %v", methodString, expiresTime) | |
baseURL := kodo.MakeBaseUrl(d.domain, d.qiniuPath(path)) | |
policy := kodo.GetPolicy{ | |
Expires: uint32(expiresTime.Unix()), | |
} | |
privateURL := d.client.MakePrivateUrl(baseURL, &policy) | |
logrus.Infof("private URL: %s", privateURL) | |
return privateURL, nil | |
} | |
func (d *driver) RemoveWriter(key string) { | |
delete(d.writerPath, key) | |
} | |
func (d *driver) qiniuPath(path string) string { | |
return strings.TrimLeft(strings.TrimRight(d.rootDirectory, "/")+path, "/") | |
} | |
func (d *driver) treePath(path string, keys *[]string, markerOut string) error { | |
entries, commonPrefixes, markerOut, err := d.bucket.List(context.Background(), d.qiniuPath(path), "/", markerOut, listMax) | |
if err != nil && len(entries) == 0 && len(commonPrefixes) == 0 { | |
if err == io.EOF { | |
return nil | |
} | |
return err | |
} | |
for _, key := range entries { | |
*keys = append(*keys, key.Key) | |
} | |
for _, commonPrefix := range commonPrefixes { | |
if err := d.treePath(commonPrefix, keys, markerOut); err != nil { | |
return err | |
} | |
} | |
if err != io.EOF { | |
if err := d.treePath(path, keys, markerOut); err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
type writer struct { | |
driver *driver | |
key string | |
size int64 | |
closed bool | |
committed bool | |
cancelled bool | |
buffer []byte | |
uploadCtxList []string | |
} | |
func (w *writer) Write(p []byte) (int, error) { | |
// w.driver.logger.WithFields(logrus.Fields{ | |
// "key": w.key, | |
// }).Debugln("Write") | |
if w.closed { | |
return 0, fmt.Errorf("already closed") | |
} else if w.committed { | |
return 0, fmt.Errorf("already committed") | |
} else if w.cancelled { | |
return 0, fmt.Errorf("already cancelled") | |
} | |
if err := w.flushBuffer(); err != nil { | |
return 0, err | |
} | |
w.buffer = append(w.buffer, p...) | |
if len(w.buffer) >= maxChunkSize { | |
if err := w.flushBuffer(); err != nil { | |
return 0, err | |
} | |
} | |
w.size += int64(len(p)) | |
return len(p), nil | |
} | |
func (w *writer) Size() int64 { | |
return w.size | |
} | |
func (w *writer) Close() error { | |
w.driver.logger.WithFields(logrus.Fields{ | |
"key": w.key, | |
"size": w.size, | |
"buffer": len(w.buffer), | |
}).Debugln("Close") | |
if w.closed { | |
return fmt.Errorf("already closed") | |
} | |
if err := w.flushBuffer(); err != nil { | |
return err | |
} | |
w.closed = true | |
return nil | |
} | |
func (w *writer) Cancel() error { | |
w.driver.logger.WithFields(logrus.Fields{ | |
"key": w.key, | |
"size": w.size, | |
"buffer": len(w.buffer), | |
}).Debugln("Cancel") | |
if w.closed { | |
return fmt.Errorf("already closed") | |
} else if w.committed { | |
return fmt.Errorf("already committed") | |
} | |
w.cancelled = true | |
return nil | |
} | |
func (w *writer) Commit() error { | |
defer func() { | |
w.driver.RemoveWriter(w.key) | |
}() | |
w.driver.logger.WithFields(logrus.Fields{ | |
"key": w.key, | |
"size": w.size, | |
"buffer": len(w.buffer), | |
}).Debugln("Commit") | |
if w.closed { | |
return fmt.Errorf("already closed") | |
} else if w.committed { | |
return fmt.Errorf("already committed") | |
} else if w.cancelled { | |
return fmt.Errorf("already cancelled") | |
} | |
if err := w.flushBuffer(); err != nil { | |
return err | |
} | |
if len(w.buffer) > 0 { | |
if err := w.mkblk(bytes.NewReader(w.buffer), len(w.buffer)); err != nil { | |
return err | |
} | |
} | |
if err := w.mkfile(); err != nil { | |
return err | |
} | |
w.committed = true | |
return nil | |
} | |
func (w *writer) flushBuffer() error { | |
for len(w.buffer) >= maxChunkSize { | |
if err := w.mkblk(bytes.NewReader(w.buffer[:maxChunkSize]), maxChunkSize); err != nil { | |
return err | |
} | |
w.buffer = w.buffer[maxChunkSize:] | |
} | |
return nil | |
} | |
func (w *writer) mkblk(blob io.Reader, blobSize int) error { | |
url := fmt.Sprintf("%s/mkblk/%d", w.driver.client.UpHosts[0], blobSize) | |
resp, err := w.post(url, blob, blobSize) | |
if err != nil { | |
return err | |
} | |
defer resp.Body.Close() | |
if resp.StatusCode == 200 { | |
res := kodocli.BlkputRet{} | |
err := json.NewDecoder(resp.Body).Decode(&res) | |
if err != nil { | |
return err | |
} | |
w.uploadCtxList = append(w.uploadCtxList, res.Ctx) | |
return nil | |
} | |
if err := rpc.ResponseError(resp); err != nil { | |
w.driver.logger.WithError(err).Errorln("mkblk failed") | |
return err | |
} | |
return nil | |
} | |
func (w *writer) mkfile() error { | |
url := fmt.Sprintf("%s/mkfile/%d/key/%s", w.driver.client.UpHosts[0], w.size, encode(w.key)) | |
buf := make([]byte, 0, 176*len(w.uploadCtxList)) | |
for _, ctx := range w.uploadCtxList { | |
buf = append(buf, ctx...) | |
buf = append(buf, ',') | |
} | |
if len(buf) > 0 { | |
buf = buf[:len(buf)-1] | |
} | |
resp, err := w.post(url, bytes.NewReader(buf), len(buf)) | |
if err != nil { | |
return err | |
} | |
if resp.StatusCode == 200 { | |
return nil | |
} | |
defer resp.Body.Close() | |
if err := rpc.ResponseError(resp); err != nil { | |
w.driver.logger.WithFields(logrus.Fields{ | |
"error": err, | |
"size": w.size, | |
"url": url, | |
}).Errorln("mkfile failed") | |
return err | |
} | |
return nil | |
} | |
func (w *writer) post(url string, blob io.Reader, blobSize int) (*http.Response, error) { | |
resp, err := func() (*http.Response, error) { | |
req, err := http.NewRequest("POST", url, blob) | |
if err != nil { | |
return nil, err | |
} | |
policy := kodo.PutPolicy{ | |
Scope: w.driver.bucket.Name, | |
Expires: 3600, | |
UpHosts: w.driver.bucket.UpHosts, | |
} | |
token, err := w.driver.client.MakeUptokenWithSafe(&policy) | |
if err != nil { | |
return nil, err | |
} | |
req.Header.Set(http.CanonicalHeaderKey("Host"), w.driver.client.UpHosts[0]) | |
req.Header.Set(http.CanonicalHeaderKey("Content-Type"), "application/octet-stream") | |
req.Header.Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(blobSize)) | |
req.Header.Set(http.CanonicalHeaderKey("Authorization"), fmt.Sprintf("UpToken %s", token)) | |
client := http.Client{} | |
return client.Do(req) | |
}() | |
if err != nil { | |
w.driver.logger.WithFields(logrus.Fields{ | |
"url": url, | |
"error": err, | |
}).Errorln("post failed") | |
} | |
return resp, err | |
} | |
func encode(raw string) string { | |
return base64.URLEncoding.EncodeToString([]byte(raw)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment