Last active
May 19, 2023 19:11
-
-
Save mrkagelui/b63e55893ea6c277184df50d219fed79 to your computer and use it in GitHub Desktop.
stream data from DB to upload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/csv" | |
"errors" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"runtime" | |
"runtime/pprof" | |
"time" | |
) | |
const ( | |
numItems = 1000000 | |
bufferSize = 100 | |
) | |
func main() { | |
f, err := os.Create("cpu.prof") | |
if err != nil { | |
log.Fatal(err) | |
} | |
pprof.StartCPUProfile(f) | |
defer pprof.StopCPUProfile() | |
var m1, m2 runtime.MemStats | |
runtime.GC() | |
runtime.ReadMemStats(&m1) | |
start := time.Now() | |
s := service{ | |
ds: store{d: db{}}, | |
up: storage{s: stub{}}, | |
} | |
if err := s.run(context.Background(), filter{userID: "traveler from beyond the fog"}); err != nil { | |
fmt.Println(err) | |
return | |
} | |
fmt.Println("success!") | |
fmt.Printf("time elapsed: %v\n", time.Since(start).Nanoseconds()) // on Go Playground this will always be 0 | |
runtime.ReadMemStats(&m2) | |
fmt.Println("total:", m2.TotalAlloc-m1.TotalAlloc) | |
fmt.Println("mallocs:", m2.Mallocs-m1.Mallocs) | |
} | |
func timeTrack(start time.Time, name string) { | |
elapsed := time.Since(start) | |
fmt.Printf("%s took %s\n", name, elapsed) | |
} | |
// run is the main logic of the service | |
func (s service) run(ctx context.Context, f filter) error { | |
defer timeTrack(time.Now(), "run") | |
pr, pw := io.Pipe() | |
r, err := s.ds.getData(ctx, f) | |
if err != nil { | |
return fmt.Errorf("querying: %v", err) | |
} | |
go writeToCSV(r, pw) | |
if err := s.up.upload(ctx, pr); err != nil { | |
return fmt.Errorf("uploading: %v", err) | |
} | |
return nil | |
} | |
// item represents the data being retrieved and uploaded | |
type item struct { | |
id int | |
name string | |
} | |
// filter is the user input that decides what data should be uploaded | |
type filter struct { | |
userID string | |
createdAfter time.Time | |
createdBefore time.Time | |
} | |
// service contains all resources it needs to carry out the retrieval and upload | |
type service struct { | |
ds dataStore | |
up fileStorage | |
} | |
// ================csv conversion================ | |
func writeToCSV(items chan item, w io.WriteCloser) error { | |
csvWriter := csv.NewWriter(w) | |
defer func() { | |
csvWriter.Flush() | |
w.Close() | |
}() | |
for i := range items { | |
if err := csvWriter.Write([]string{fmt.Sprintf("%v", i.id), i.name}); err != nil { | |
return fmt.Errorf("writing one: %v", err) | |
} | |
} | |
return nil | |
} | |
// ================data store================ | |
// dataStore retrieves data | |
type dataStore interface { | |
getData(context.Context, filter) (chan item, error) | |
} | |
// store implements dataStore interface | |
type store struct { | |
d db | |
} | |
// getData returns a list of items | |
func (s store) getData(ctx context.Context, f filter) (chan item, error) { | |
r, err := s.d.query(ctx, f) | |
if err != nil { | |
return nil, fmt.Errorf("querying: %v", err) | |
} | |
result := make(chan item, bufferSize) | |
go func() { | |
defer close(result) | |
for r.next() { | |
var i item | |
if err := r.scan(&i); err != nil { | |
return | |
} | |
result <- i | |
} | |
}() | |
return result, nil | |
} | |
// ================file storage================ | |
// fileStorage uploads the data to file storage | |
type fileStorage interface { | |
upload(context.Context, io.Reader) error | |
} | |
// storage implements fileStorage interface | |
type storage struct { | |
s stub | |
} | |
// upload performs the content uploading | |
func (s storage) upload(ctx context.Context, r io.Reader) error { | |
defer timeTrack(time.Now(), "upload") | |
return s.s.pub(ctx, r) | |
} | |
// ================mock db================ | |
type db struct{} | |
func (d db) query(context.Context, filter) (*rows, error) { | |
return &rows{counter: 0}, nil | |
} | |
type rows struct { | |
counter int | |
} | |
func (r *rows) next() bool { | |
return r.counter < numItems | |
} | |
func (r *rows) scan(i *item) error { | |
names := []string{"abby", "brown", "candice"} | |
i.id, i.name = r.counter, names[r.counter%len(names)] | |
r.counter++ | |
return nil | |
} | |
// ================mock stub================ | |
type stub struct{} | |
func (s stub) pub(_ context.Context, r io.Reader) error { | |
// read 1024 bytes at a time | |
buf := make([]byte, 1024) | |
var b int | |
for { | |
n, err := r.Read(buf) | |
if errors.Is(err, io.EOF) { | |
break | |
} | |
if err != nil { | |
return fmt.Errorf("read: %v", err) | |
} | |
b += n | |
} | |
fmt.Println("uploaded", b, "bytes") | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment