Last active
September 1, 2019 15:29
-
-
Save serverhorror/d7d29f928152032118576ebc49a16e66 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func handleRead(l *zap.Logger, s storage.Interface, pbRead *pb.Read, rdr io.Reader) (io.Writer, error) { | |
l.Info("(pb.ReadRequest_Read)", | |
zap.String("type", fmt.Sprintf("%T", pbRead)), | |
zap.String("value", | |
fmt.Sprintf("%v", pbRead)), | |
) | |
rd, err := read.TranscribeFromPbRead(pbRead) | |
if err != nil { | |
return nil, err | |
} | |
wr, err := s.Upload(context.Background(), rd) | |
if err != nil { | |
return nil, err | |
} | |
return wr, nil | |
} | |
func handleChunk(l *zap.Logger, chunk *pb.Chunk, wr io.Writer) (int, error) { | |
n, err := wr.Write(chunk.Bytes) | |
if err != nil { | |
if err == io.EOF { | |
return 0, nil | |
} | |
return 0, err | |
} | |
l.Info("(pb.readRequest_Chunk)", | |
zap.String("type", fmt.Sprintf("%T", chunk)), | |
zap.Binary("chunk.Bytes[:5]", chunk.Bytes[:5]), | |
zap.Int("chunk.size", len(chunk.Bytes)), | |
zap.Int("bytes.written", n), | |
) | |
return n, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func (c *StorageService) Upload(stream pb.StorageService_UploadServer) error { | |
receivedDescription := false | |
var rdr io.Reader | |
var wr io.Writer | |
var bytesReceived int64 | |
for { | |
var readReq *pb.ReadRequest | |
var err error | |
readReq, err = stream.Recv() | |
if err != nil { | |
if err == io.EOF { | |
resp := &pb.ChunkResponse{ | |
Len: bytesReceived, | |
} | |
c.l.Sugar().Infof("sending response", "pb.ChunkResponse", resp) | |
// FIXME: for now we simply return the total number of bytes we received | |
return stream.SendAndClose(resp) | |
} | |
return errors.Wrap(err, "(*StorageService).Upload") | |
} | |
err = handleReadReq(*readReq) | |
if err != nil { | |
return errors.Wrap(err, "(*StorageService).Upload") | |
} | |
// Check what we have received | |
// it should be: | |
// 1 x *pb.ReadRequest_Read (exactly) | |
// n x *pb.ReadRequest_Chunk | |
switch v := readReq.Value.(type) { | |
case *pb.ReadRequest_Read: | |
if receivedDescription { | |
panic("Read already received!") | |
} | |
// creates an s3manager.Uploader and runs the stuff in a goroutine | |
// wr is from an io.Pipe() | |
// rdr of that is the Body of the s3manager uploader | |
wr, err = handleRead(c.l, c.storage, v.Read, rdr) | |
if err != nil { | |
return err | |
} | |
receivedDescription = true | |
case *pb.ReadRequest_Chunk: | |
if !receivedDescription { | |
panic("Read not yet received!") | |
} | |
// essentially wr(v.Chunk.Bytes) | |
n, err := handleChunk(c.l, v.Chunk, wr) | |
if err != nil { | |
return err | |
} | |
bytesReceived += int64(n) | |
default: | |
c.l.Warn("DEFAULT", | |
zap.String("received an unexpected type", fmt.Sprintf("%T", v)), | |
) | |
} | |
} | |
panic("unreachable code") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Process exiting with code: 0 | |
API server listening at: 127.0.0.1:6981 | |
2019-09-01T17:25:31.805+0200 INFO server/server.go:84 Listening {"port": "127.0.0.1:8080"} | |
2019-09-01T17:25:45.262+0200 INFO elastic/elastic.go:136 elastic.Backend.Create(s)%!(EXTRA string=sample, *sample.Sample=&{{ } 0xc000995810 <nil> <nil> <nil> <nil> <nil> [0xc0000d78b0 0xc0000d7900 0xc0000d7950] [0xc0000d7450=b 0xc0000d7630=c] [0xc000068e40 0xc000068ea0] [] false}) | |
2019-09-01T17:25:45.288+0200 INFO elastic/elastic.go:164 response%!(EXTRA string=response, *elastic.IndexResponse=&{stool _doc 0dfcbee5-2619-4d56-8e00-f05d5c0d812d 1 created 0xc0009783c0 44 2 0 false}) | |
2019-09-01T17:25:45.383+0200 INFO zap/server_interceptors.go:40 finished unary call with code OK {"grpc.start_time": "2019-09-01T17:25:45+02:00", "system": "grpc", "span.kind": "server", "grpc.service": "assetkit.stool.SampleService", "grpc.method": "Create", "grpc.code": "OK", "grpc.time_ms": 122.99800109863281} | |
2019-09-01T17:25:45.386+0200 INFO storageservice/storageservice.go:122 (pb.ReadRequest_Read) {"type": "*pb.Read", "value": "base_name:\"SP1.fq\" cloud_uri:\"s3://marcherm-test-stool-bucket/data/input/0dfcbee5-2619-4d56-8e00-f05d5c0d812d/0dfcbee5-2619-4d56-8e00-f05d5c0d812d_R1.fastq.gz\" path:\"C:\\\\Users\\\\martin\\\\code\\\\serverhorror\\\\stool\\\\testdata\\\\SP1.fq\" direction:DIRECTION_Forward "} | |
2019-09-01T17:25:45.387+0200 INFO s3/uploader.go:29 creating new uploader {"storageURL": "s3://marcherm-test-stool-bucket/data/input/0dfcbee5-2619-4d56-8e00-f05d5c0d812d/0dfcbee5-2619-4d56-8e00-f05d5c0d812d_R1.fastq.gz"} | |
2019-09-01T17:25:45.389+0200 INFO storageservice/storageservice.go:147 (pb.readRequest_Chunk) {"type": "*pb.Chunk", "chunk.Bytes[:5]": "QGNsdXM=", "chunk.size": 16384, "bytes.written": 16384} | |
2019-09-01T17:25:45.389+0200 INFO storageservice/storageservice.go:147 (pb.readRequest_Chunk) {"type": "*pb.Chunk", "chunk.Bytes[:5]": "QUFBQ0E=", "chunk.size": 6087, "bytes.written": 6087} | |
2019-09-01T17:25:45.390+0200 INFO storageservice/storageservice.go:62 sending response%!(EXTRA string=pb.ChunkResponse, *pb.ChunkResponse=len:22471 ) | |
2019-09-01T17:25:45.390+0200 INFO zap/server_interceptors.go:67 finished streaming call with code OK {"grpc.start_time": "2019-09-01T17:25:45+02:00", "system": "grpc", "span.kind": "server", "grpc.service": "assetkit.stool.StorageService", "grpc.method": "Upload", "grpc.code": "OK", "grpc.time_ms": 4.000999927520752} | |
2019-09-01T17:25:45.393+0200 INFO storageservice/storageservice.go:122 (pb.ReadRequest_Read) {"type": "*pb.Read", "value": "base_name:\"SP1.fq\" cloud_uri:\"s3://marcherm-test-stool-bucket/data/input/0dfcbee5-2619-4d56-8e00-f05d5c0d812d/0dfcbee5-2619-4d56-8e00-f05d5c0d812d_R2.fastq.gz\" path:\"C:\\\\Users\\\\martin\\\\code\\\\serverhorror\\\\stool\\\\testdata\\\\SP1.fq\" direction:DIRECTION_Reverse "} | |
2019-09-01T17:25:45.394+0200 INFO s3/uploader.go:29 creating new uploader {"storageURL": "s3://marcherm-test-stool-bucket/data/input/0dfcbee5-2619-4d56-8e00-f05d5c0d812d/0dfcbee5-2619-4d56-8e00-f05d5c0d812d_R2.fastq.gz"} | |
2019-09-01T17:25:45.396+0200 INFO storageservice/storageservice.go:147 (pb.readRequest_Chunk) {"type": "*pb.Chunk", "chunk.Bytes[:5]": "QGNsdXM=", "chunk.size": 16384, "bytes.written": 16384} | |
2019-09-01T17:25:45.400+0200 INFO storageservice/storageservice.go:147 (pb.readRequest_Chunk) {"type": "*pb.Chunk", "chunk.Bytes[:5]": "QUFBQ0E=", "chunk.size": 6087, "bytes.written": 6087} | |
2019-09-01T17:25:45.401+0200 INFO storageservice/storageservice.go:62 sending response%!(EXTRA string=pb.ChunkResponse, *pb.ChunkResponse=len:22471 ) | |
2019-09-01T17:25:45.401+0200 INFO zap/server_interceptors.go:67 finished streaming call with code OK {"grpc.start_time": "2019-09-01T17:25:45+02:00", "system": "grpc", "span.kind": "server", "grpc.service": "assetkit.stool.StorageService", "grpc.method": "Upload", "grpc.code": "OK", "grpc.time_ms": 9} | |
2019-09-01T17:25:45.403+0200 INFO zap/grpclogger.go:41 transport: loopyWriter.run returning. connection error: desc = "transport is closing" {"system": "grpc", "grpc_log": true} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package s3 | |
import ( | |
"context" | |
"io" | |
"net/url" | |
"strings" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/s3/s3manager" | |
"github.com/pkg/errors" | |
"go.uber.org/zap" | |
) | |
type uploader struct { | |
ctx context.Context | |
l *zap.Logger | |
sess *session.Session | |
mgr *s3manager.Uploader | |
task *s3manager.UploadInput | |
wr io.Writer | |
rdr io.Reader | |
} | |
func newUploader(ctx context.Context, l *zap.Logger, storageURL *url.URL) (*uploader, error) { | |
l.Info("creating new uploader", zap.Stringer("storageURL", storageURL)) | |
sess, err := session.NewSession() | |
if err != nil { | |
return nil, errors.Wrap(err, "StoolServer.newUploader") | |
} | |
src, wr := io.Pipe() | |
mgr := s3manager.NewUploader(sess) | |
mgr.PartSize = s3manager.DefaultUploadPartSize | |
mgr.Concurrency = 5 | |
mgr.LeavePartsOnError = false | |
key := strings.TrimPrefix(storageURL.Path, "/") | |
task := &s3manager.UploadInput{ | |
Body: src, | |
Bucket: aws.String(storageURL.Host), | |
Key: aws.String(key), | |
// Metadata: map[string]*string{ | |
// "SourceURI": aws.String( | |
// fmt.Sprint(sf.SourceURI), | |
// ), | |
// }, | |
} | |
u := &uploader{ | |
ctx: ctx, | |
l: l, | |
sess: sess, | |
mgr: mgr, | |
task: task, | |
rdr: src, | |
wr: wr, | |
} | |
return u, u.Validate() | |
} | |
func (u *uploader) Validate() error { | |
return nil | |
} | |
func (u *uploader) Write(b []byte) (int, error) { | |
n, err := u.wr.Write(b) | |
return n, err | |
} | |
func (u *uploader) Do() (*s3manager.UploadOutput, error) { | |
var result *s3manager.UploadOutput | |
var err error | |
// result, err = u.mgr.UploadWithContext(u.ctx, u.task) | |
result, err = u.mgr.Upload(u.task) | |
if err != nil { | |
u.l.Error("could not upload to S3", zap.Error(err)) | |
panic(err) | |
return nil, err | |
} | |
u.l.Sugar().Infof("s3.(*uploader).Do", "result", result) | |
if err != nil { | |
panic(err) | |
} | |
return result, errors.Wrap(err, "Failed to upload") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment