Skip to content

Instantly share code, notes, and snippets.

@mrkagelui
Last active May 19, 2023 19:11
Show Gist options
  • Save mrkagelui/b63e55893ea6c277184df50d219fed79 to your computer and use it in GitHub Desktop.
Save mrkagelui/b63e55893ea6c277184df50d219fed79 to your computer and use it in GitHub Desktop.
stream data from DB to upload
package main
import (
"context"
"encoding/csv"
"errors"
"fmt"
"io"
"log"
"os"
"runtime"
"runtime/pprof"
"time"
)
const (
numItems = 1000000
bufferSize = 100
)
func main() {
f, err := os.Create("cpu.prof")
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
var m1, m2 runtime.MemStats
runtime.GC()
runtime.ReadMemStats(&m1)
start := time.Now()
s := service{
ds: store{d: db{}},
up: storage{s: stub{}},
}
if err := s.run(context.Background(), filter{userID: "traveler from beyond the fog"}); err != nil {
fmt.Println(err)
return
}
fmt.Println("success!")
fmt.Printf("time elapsed: %v\n", time.Since(start).Nanoseconds()) // on Go Playground this will always be 0
runtime.ReadMemStats(&m2)
fmt.Println("total:", m2.TotalAlloc-m1.TotalAlloc)
fmt.Println("mallocs:", m2.Mallocs-m1.Mallocs)
}
func timeTrack(start time.Time, name string) {
elapsed := time.Since(start)
fmt.Printf("%s took %s\n", name, elapsed)
}
// run is the main logic of the service
func (s service) run(ctx context.Context, f filter) error {
defer timeTrack(time.Now(), "run")
pr, pw := io.Pipe()
r, err := s.ds.getData(ctx, f)
if err != nil {
return fmt.Errorf("querying: %v", err)
}
go writeToCSV(r, pw)
if err := s.up.upload(ctx, pr); err != nil {
return fmt.Errorf("uploading: %v", err)
}
return nil
}
// item represents the data being retrieved and uploaded
type item struct {
id int
name string
}
// filter is the user input that decides what data should be uploaded
type filter struct {
userID string
createdAfter time.Time
createdBefore time.Time
}
// service contains all resources it needs to carry out the retrieval and upload
type service struct {
ds dataStore
up fileStorage
}
// ================csv conversion================
func writeToCSV(items chan item, w io.WriteCloser) error {
csvWriter := csv.NewWriter(w)
defer func() {
csvWriter.Flush()
w.Close()
}()
for i := range items {
if err := csvWriter.Write([]string{fmt.Sprintf("%v", i.id), i.name}); err != nil {
return fmt.Errorf("writing one: %v", err)
}
}
return nil
}
// ================data store================
// dataStore retrieves data
type dataStore interface {
getData(context.Context, filter) (chan item, error)
}
// store implements dataStore interface
type store struct {
d db
}
// getData returns a list of items
func (s store) getData(ctx context.Context, f filter) (chan item, error) {
r, err := s.d.query(ctx, f)
if err != nil {
return nil, fmt.Errorf("querying: %v", err)
}
result := make(chan item, bufferSize)
go func() {
defer close(result)
for r.next() {
var i item
if err := r.scan(&i); err != nil {
return
}
result <- i
}
}()
return result, nil
}
// ================file storage================
// fileStorage uploads the data to file storage
type fileStorage interface {
upload(context.Context, io.Reader) error
}
// storage implements fileStorage interface
type storage struct {
s stub
}
// upload performs the content uploading
func (s storage) upload(ctx context.Context, r io.Reader) error {
defer timeTrack(time.Now(), "upload")
return s.s.pub(ctx, r)
}
// ================mock db================
type db struct{}
func (d db) query(context.Context, filter) (*rows, error) {
return &rows{counter: 0}, nil
}
type rows struct {
counter int
}
func (r *rows) next() bool {
return r.counter < numItems
}
func (r *rows) scan(i *item) error {
names := []string{"abby", "brown", "candice"}
i.id, i.name = r.counter, names[r.counter%len(names)]
r.counter++
return nil
}
// ================mock stub================
type stub struct{}
func (s stub) pub(_ context.Context, r io.Reader) error {
// read 1024 bytes at a time
buf := make([]byte, 1024)
var b int
for {
n, err := r.Read(buf)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("read: %v", err)
}
b += n
}
fmt.Println("uploaded", b, "bytes")
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment