Skip to content

Instantly share code, notes, and snippets.

@serverhorror
Last active September 1, 2019 15:29
Show Gist options
  • Save serverhorror/d7d29f928152032118576ebc49a16e66 to your computer and use it in GitHub Desktop.
Save serverhorror/d7d29f928152032118576ebc49a16e66 to your computer and use it in GitHub Desktop.
func handleRead(l *zap.Logger, s storage.Interface, pbRead *pb.Read, rdr io.Reader) (io.Writer, error) {
l.Info("(pb.ReadRequest_Read)",
zap.String("type", fmt.Sprintf("%T", pbRead)),
zap.String("value",
fmt.Sprintf("%v", pbRead)),
)
rd, err := read.TranscribeFromPbRead(pbRead)
if err != nil {
return nil, err
}
wr, err := s.Upload(context.Background(), rd)
if err != nil {
return nil, err
}
return wr, nil
}
func handleChunk(l *zap.Logger, chunk *pb.Chunk, wr io.Writer) (int, error) {
n, err := wr.Write(chunk.Bytes)
if err != nil {
if err == io.EOF {
return 0, nil
}
return 0, err
}
l.Info("(pb.readRequest_Chunk)",
zap.String("type", fmt.Sprintf("%T", chunk)),
zap.Binary("chunk.Bytes[:5]", chunk.Bytes[:5]),
zap.Int("chunk.size", len(chunk.Bytes)),
zap.Int("bytes.written", n),
)
return n, nil
}
func (c *StorageService) Upload(stream pb.StorageService_UploadServer) error {
receivedDescription := false
var rdr io.Reader
var wr io.Writer
var bytesReceived int64
for {
var readReq *pb.ReadRequest
var err error
readReq, err = stream.Recv()
if err != nil {
if err == io.EOF {
resp := &pb.ChunkResponse{
Len: bytesReceived,
}
c.l.Sugar().Infof("sending response", "pb.ChunkResponse", resp)
// FIXME: for now we simply return the total number of bytes we received
return stream.SendAndClose(resp)
}
return errors.Wrap(err, "(*StorageService).Upload")
}
err = handleReadReq(*readReq)
if err != nil {
return errors.Wrap(err, "(*StorageService).Upload")
}
// Check what we have received
// it should be:
// 1 x *pb.ReadRequest_Read (exactly)
// n x *pb.ReadRequest_Chunk
switch v := readReq.Value.(type) {
case *pb.ReadRequest_Read:
if receivedDescription {
panic("Read already received!")
}
// creates an s3manager.Uploader and runs the stuff in a goroutine
// wr is from an io.Pipe()
// rdr of that is the Body of the s3manager uploader
wr, err = handleRead(c.l, c.storage, v.Read, rdr)
if err != nil {
return err
}
receivedDescription = true
case *pb.ReadRequest_Chunk:
if !receivedDescription {
panic("Read not yet received!")
}
// essentially wr(v.Chunk.Bytes)
n, err := handleChunk(c.l, v.Chunk, wr)
if err != nil {
return err
}
bytesReceived += int64(n)
default:
c.l.Warn("DEFAULT",
zap.String("received an unexpected type", fmt.Sprintf("%T", v)),
)
}
}
panic("unreachable code")
}
Process exiting with code: 0
API server listening at: 127.0.0.1:6981
2019-09-01T17:25:31.805+0200 INFO server/server.go:84 Listening {"port": "127.0.0.1:8080"}
2019-09-01T17:25:45.262+0200 INFO elastic/elastic.go:136 elastic.Backend.Create(s)%!(EXTRA string=sample, *sample.Sample=&{{ } 0xc000995810 <nil> <nil> <nil> <nil> <nil> [0xc0000d78b0 0xc0000d7900 0xc0000d7950] [0xc0000d7450=b 0xc0000d7630=c] [0xc000068e40 0xc000068ea0] [] false})
2019-09-01T17:25:45.288+0200 INFO elastic/elastic.go:164 response%!(EXTRA string=response, *elastic.IndexResponse=&{stool _doc 0dfcbee5-2619-4d56-8e00-f05d5c0d812d 1 created 0xc0009783c0 44 2 0 false})
2019-09-01T17:25:45.383+0200 INFO zap/server_interceptors.go:40 finished unary call with code OK {"grpc.start_time": "2019-09-01T17:25:45+02:00", "system": "grpc", "span.kind": "server", "grpc.service": "assetkit.stool.SampleService", "grpc.method": "Create", "grpc.code": "OK", "grpc.time_ms": 122.99800109863281}
2019-09-01T17:25:45.386+0200 INFO storageservice/storageservice.go:122 (pb.ReadRequest_Read) {"type": "*pb.Read", "value": "base_name:\"SP1.fq\" cloud_uri:\"s3://marcherm-test-stool-bucket/data/input/0dfcbee5-2619-4d56-8e00-f05d5c0d812d/0dfcbee5-2619-4d56-8e00-f05d5c0d812d_R1.fastq.gz\" path:\"C:\\\\Users\\\\martin\\\\code\\\\serverhorror\\\\stool\\\\testdata\\\\SP1.fq\" direction:DIRECTION_Forward "}
2019-09-01T17:25:45.387+0200 INFO s3/uploader.go:29 creating new uploader {"storageURL": "s3://marcherm-test-stool-bucket/data/input/0dfcbee5-2619-4d56-8e00-f05d5c0d812d/0dfcbee5-2619-4d56-8e00-f05d5c0d812d_R1.fastq.gz"}
2019-09-01T17:25:45.389+0200 INFO storageservice/storageservice.go:147 (pb.readRequest_Chunk) {"type": "*pb.Chunk", "chunk.Bytes[:5]": "QGNsdXM=", "chunk.size": 16384, "bytes.written": 16384}
2019-09-01T17:25:45.389+0200 INFO storageservice/storageservice.go:147 (pb.readRequest_Chunk) {"type": "*pb.Chunk", "chunk.Bytes[:5]": "QUFBQ0E=", "chunk.size": 6087, "bytes.written": 6087}
2019-09-01T17:25:45.390+0200 INFO storageservice/storageservice.go:62 sending response%!(EXTRA string=pb.ChunkResponse, *pb.ChunkResponse=len:22471 )
2019-09-01T17:25:45.390+0200 INFO zap/server_interceptors.go:67 finished streaming call with code OK {"grpc.start_time": "2019-09-01T17:25:45+02:00", "system": "grpc", "span.kind": "server", "grpc.service": "assetkit.stool.StorageService", "grpc.method": "Upload", "grpc.code": "OK", "grpc.time_ms": 4.000999927520752}
2019-09-01T17:25:45.393+0200 INFO storageservice/storageservice.go:122 (pb.ReadRequest_Read) {"type": "*pb.Read", "value": "base_name:\"SP1.fq\" cloud_uri:\"s3://marcherm-test-stool-bucket/data/input/0dfcbee5-2619-4d56-8e00-f05d5c0d812d/0dfcbee5-2619-4d56-8e00-f05d5c0d812d_R2.fastq.gz\" path:\"C:\\\\Users\\\\martin\\\\code\\\\serverhorror\\\\stool\\\\testdata\\\\SP1.fq\" direction:DIRECTION_Reverse "}
2019-09-01T17:25:45.394+0200 INFO s3/uploader.go:29 creating new uploader {"storageURL": "s3://marcherm-test-stool-bucket/data/input/0dfcbee5-2619-4d56-8e00-f05d5c0d812d/0dfcbee5-2619-4d56-8e00-f05d5c0d812d_R2.fastq.gz"}
2019-09-01T17:25:45.396+0200 INFO storageservice/storageservice.go:147 (pb.readRequest_Chunk) {"type": "*pb.Chunk", "chunk.Bytes[:5]": "QGNsdXM=", "chunk.size": 16384, "bytes.written": 16384}
2019-09-01T17:25:45.400+0200 INFO storageservice/storageservice.go:147 (pb.readRequest_Chunk) {"type": "*pb.Chunk", "chunk.Bytes[:5]": "QUFBQ0E=", "chunk.size": 6087, "bytes.written": 6087}
2019-09-01T17:25:45.401+0200 INFO storageservice/storageservice.go:62 sending response%!(EXTRA string=pb.ChunkResponse, *pb.ChunkResponse=len:22471 )
2019-09-01T17:25:45.401+0200 INFO zap/server_interceptors.go:67 finished streaming call with code OK {"grpc.start_time": "2019-09-01T17:25:45+02:00", "system": "grpc", "span.kind": "server", "grpc.service": "assetkit.stool.StorageService", "grpc.method": "Upload", "grpc.code": "OK", "grpc.time_ms": 9}
2019-09-01T17:25:45.403+0200 INFO zap/grpclogger.go:41 transport: loopyWriter.run returning. connection error: desc = "transport is closing" {"system": "grpc", "grpc_log": true}
package s3
import (
"context"
"io"
"net/url"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type uploader struct {
ctx context.Context
l *zap.Logger
sess *session.Session
mgr *s3manager.Uploader
task *s3manager.UploadInput
wr io.Writer
rdr io.Reader
}
func newUploader(ctx context.Context, l *zap.Logger, storageURL *url.URL) (*uploader, error) {
l.Info("creating new uploader", zap.Stringer("storageURL", storageURL))
sess, err := session.NewSession()
if err != nil {
return nil, errors.Wrap(err, "StoolServer.newUploader")
}
src, wr := io.Pipe()
mgr := s3manager.NewUploader(sess)
mgr.PartSize = s3manager.DefaultUploadPartSize
mgr.Concurrency = 5
mgr.LeavePartsOnError = false
key := strings.TrimPrefix(storageURL.Path, "/")
task := &s3manager.UploadInput{
Body: src,
Bucket: aws.String(storageURL.Host),
Key: aws.String(key),
// Metadata: map[string]*string{
// "SourceURI": aws.String(
// fmt.Sprint(sf.SourceURI),
// ),
// },
}
u := &uploader{
ctx: ctx,
l: l,
sess: sess,
mgr: mgr,
task: task,
rdr: src,
wr: wr,
}
return u, u.Validate()
}
func (u *uploader) Validate() error {
return nil
}
func (u *uploader) Write(b []byte) (int, error) {
n, err := u.wr.Write(b)
return n, err
}
func (u *uploader) Do() (*s3manager.UploadOutput, error) {
var result *s3manager.UploadOutput
var err error
// result, err = u.mgr.UploadWithContext(u.ctx, u.task)
result, err = u.mgr.Upload(u.task)
if err != nil {
u.l.Error("could not upload to S3", zap.Error(err))
panic(err)
return nil, err
}
u.l.Sugar().Infof("s3.(*uploader).Do", "result", result)
if err != nil {
panic(err)
}
return result, errors.Wrap(err, "Failed to upload")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment