Skip to content

Instantly share code, notes, and snippets.

@pcolazurdo
Last active August 24, 2021 16:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pcolazurdo/50b9f5bbc5384957bfba7bd4f6177e38 to your computer and use it in GitHub Desktop.
Save pcolazurdo/50b9f5bbc5384957bfba7bd4f6177e38 to your computer and use it in GitHub Desktop.
AWS_SDK_BUGREPORT1
// This sample code triggers the problem if run against a large directory structure (+50K objects)
// I added some instrumention to check how many connections were being resued versus created new
// Please change the table name in line 283 and path in line 286
package main
import (
"context"
"crypto/md5"
"strings"
"io"
"io/fs"
"os"
"path/filepath"
"sync"
"syscall"
"errors"
"fmt"
"time"
"net"
"net/http"
"net/http/httptrace"
"golang.org/x/net/http2"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"log"
)
type RequestTrace struct {
}
type HTTPTrace struct {
context.Context
Reused bool
}
func (t *HTTPTrace) gotConn(info httptrace.GotConnInfo) {
fmt.Printf("Connection reused for? %+v %v\n", info.Conn.RemoteAddr(), info.Reused) //, info.Reused)
t.Reused = info.Reused
}
func NewHTTPTrace(ctx context.Context) *HTTPTrace {
t := &HTTPTrace{}
trace := &httptrace.ClientTrace{
GotConn: t.gotConn,
}
t.Context = httptrace.WithClientTrace(ctx, trace)
return t
}
func (t *RequestTrace) TraceRequest(r *request.Request) {
// Ensure that the http trace added to the request always uses the original
// context instead of each following attempt's context to prevent conflict
// with previous http traces used.
origContext := r.Context()
// Send
r.Handlers.Send.PushFront(func(rr *request.Request) {
aHTTPTrace := NewHTTPTrace(origContext)
rr.SetContext(aHTTPTrace)
})
}
func (t *RequestTrace) String() string {
var w strings.Builder
// l := t.Latency()
// writeDurField(&w, "Latency", l.Latency)
// writeDurField(&w, "Validate", l.Validate)
// writeDurField(&w, "Build", l.Build)
// writeField(&w, "Attempts", "%d", len(t.Attempts))
// for i, a := range t.Attempts {
// fmt.Fprintf(&w, "\n\tAttempt: %d, %s", i, a)
// }
return w.String()
}
type workerCounter struct {
sync.RWMutex
counter int
}
type RunInventoryVars struct {
PathName string
// name string
// envName string
// imageTag string
// resourceTags map[string]string
}
// Job for worker
type workerJob struct {
Root string
}
// Result of a worker
type workerResult struct {
Filename string
}
type HTTPClientSettings struct {
Connect time.Duration
ConnKeepAlive time.Duration
ExpectContinue time.Duration
IdleConn time.Duration
MaxAllIdleConns int
MaxHostIdleConns int
ResponseHeader time.Duration
TLSHandshake time.Duration
}
type Database struct {
// sync.RWMutex
sess *session.Session
svc *dynamodb.DynamoDB
table string
ctx context.Context
}
type ddbEntry struct {
FileName string `dynamodbav:"filename"`
Pk string `dynamodbav:"pk"`
Sk string `dynamodbav:"sk"`
}
type Output struct {
Count int64
Items []ddbEntry
}
// transport is an http.RoundTripper that keeps track of the in-flight
// request and implements hooks to report HTTP tracing events.
// type transport struct {
// current *http.Request
// }
// // GotConn prints whether the connection has been used previously
// // for the current request.
// func (t *transport) GotConn(info httptrace.GotConnInfo) {
// // fmt.Printf("Connection reused for %#v? %#v\n", t.current.URL, info.Reused)
// }
func NewHTTPClientWithSettings(ctxParent context.Context, httpSettings HTTPClientSettings) (*http.Client, context.Context, error) {
var client http.Client
// t := &transport{}
tr := &http.Transport{
ResponseHeaderTimeout: httpSettings.ResponseHeader,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
KeepAlive: httpSettings.ConnKeepAlive,
DualStack: true,
Timeout: httpSettings.Connect,
}).DialContext,
MaxIdleConns: httpSettings.MaxAllIdleConns,
IdleConnTimeout: httpSettings.IdleConn,
TLSHandshakeTimeout: httpSettings.TLSHandshake,
MaxIdleConnsPerHost: httpSettings.MaxHostIdleConns,
ExpectContinueTimeout: httpSettings.ExpectContinue,
}
// trace := &httptrace.ClientTrace{
// GotConn: t.GotConn,
// }
// ctx := httptrace.WithClientTrace(ctxParent, trace)
// So client makes HTTP/2 requests
err := http2.ConfigureTransport(tr)
if err != nil {
fmt.Printf("Error Configuring HTTP/2")
return &client, nil, err
}
return &http.Client{
Transport: tr,
}, ctxParent, nil
}
func NewDatabaseConnection(table string) *Database {
// Initialize a session that the SDK will use to load
// credentials from the shared credentials file ~/.aws/credentials
// and region from the shared configuration file ~/.aws/config.
ctxParent := context.Background()
httpClient, ctx, err := NewHTTPClientWithSettings(ctxParent, HTTPClientSettings{
Connect: 5 * time.Second,
ExpectContinue: 1 * time.Second,
IdleConn: 90 * time.Second,
ConnKeepAlive: 30 * time.Second,
MaxAllIdleConns: 10, // To avoid issues with EMFILE errors when too many Idle connections are kept in MacOS
MaxHostIdleConns: 2,
ResponseHeader: 5 * time.Second,
TLSHandshake: 5 * time.Second,
})
if err != nil {
fmt.Println("Got an error creating custom HTTP client:")
fmt.Println(err)
panic("Got an error creating custom HTTP client:")
}
sess := session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
HTTPClient: httpClient,
},
SharedConfigState: session.SharedConfigEnable,
}))
// Create DynamoDB client
svc := dynamodb.New(sess)
// svc.Handlers.Build.PushFront(func(r *request.Request) {
// obs.Counter("NewRequest", 1)
// })
return &Database{
sess: sess,
svc: svc,
table: table,
ctx: ctx,
}
}
func (d *Database) saveItem(item interface{}) error {
trace := &RequestTrace{}
if d.table != "" {
av, err := dynamodbattribute.MarshalMap(item)
if err != nil {
log.Fatalf("Got error marshalling new item: %s", err)
}
tableName := d.table
input := &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(tableName),
}
// d.Lock()
// defer d.Unlock()
fmt.Printf("\nSaveFile\n")
_, err = d.svc.PutItemWithContext(d.ctx, input, trace.TraceRequest)
if err != nil {
log.Fatalf("Got error calling PutItem: %s", err)
}
return err
} else {
return nil
}
}
//(map[int]*ddbEntry, error)
func main() {
RunInventory()
}
const maxCalc = 50 // TODO: Make it configurable
func RunInventory() {
var startTime = time.Now()
var db = NewDatabaseConnection("table-name")
// var path = "/Users/pabcol/WorkDocs/AMZN/"
var path = "../.."
// cwd, err := os.Getwd()
dir := path
workerCount := 1
jobs := make(chan workerJob, workerCount)
results := make(chan workerResult)
readDone := make(chan bool)
calcDone := make(chan bool)
wg := &sync.WaitGroup{}
// start N workers
for i := 0; i < workerCount; i++ {
go treeWalker(jobs, results, wg)
}
// One initial job
wg.Add(1)
go func() {
jobs <- workerJob{
Root: dir,
}
}()
for i := 0; i < workerCount; i++ {
go workerCalc(results, calcDone, startTime.String(), db)
}
// When all jobs finished, shutdown the system.
go func() {
wg.Wait()
readDone <- true
}()
readloop:
for {
select {
case <-readDone:
// Finished traversing path
fmt.Printf("readDone: %v\n", time.Now().Format("2006-01-02T15:04:05Z07:00"))
time.Sleep(30 * time.Second)
close(jobs)
close(results)
case <-calcDone:
// All existing files have been processed by calc_md5
fmt.Printf("calcDone: %v\n", time.Now().Format("2006-01-02T15:04:05Z07:00"))
break readloop
}
}
}
// consumer of the jobs channel
// producer of workerResult
func treeWalker(jobs chan workerJob, results chan<- workerResult, wg *sync.WaitGroup) {
// While there are new jobs
for j := range jobs {
dir, err := os.Open(j.Root)
if err != nil {
handleFileOpenLimits(err)
dir.Close()
wg.Done()
continue
}
fInfo, err := dir.Readdir(-1) // Return all files in directory
dir.Close()
if err != nil {
handleFileOpenLimits(err)
wg.Done()
// if os.IsPermission(err) {
// // Skip if there's no permission
// continue
// }
// For now, skip if there is an error
continue
}
for _, file := range fInfo {
fmt.Printf("\ntreeWalker\n")
fpath, _ := filepath.Abs(filepath.Join(dir.Name(), file.Name()))
if file.Mode().IsRegular() {
// is file
fs := uint64(file.Size())
if fs == 0 {
// Skip zero sized
// wg.Done() // Not needed because if it is a file there is not a new job
continue
}
r := workerResult{
Filename: fpath,
}
results <- r
} else if file.IsDir() {
// Send directory to be processed by the worker
nj := workerJob{
Root: fpath,
}
// One more job, adds to wg
wg.Add(1)
// Do not block when sending jobs
go func() {
jobs <- nj
}()
}
}
// Done one job, let wg know.
wg.Done()
}
}
func openFile(filename string) string {
fmt.Printf("\nProcessed\n")
file, err := os.Open(filename)
if err != nil {
handleFileOpenLimits(err)
file.Close()
return ""
// panic(err)
}
// defer file.Close()
hash := md5.New()
_, err = io.Copy(hash, file)
if err != nil {
handleFileOpenLimits(err)
file.Close()
return ""
// panic(err)
}
file.Close()
return filename
}
func workerCalc(results chan workerResult, done chan bool, timestamp string, database *Database) {
wgCalc := &sync.WaitGroup{}
i := workerCounter{
counter: 0,
}
wgCalc.Add(1)
go func() {
wgCalc.Wait()
done <- true
}()
for j := range results {
wgCalc.Add(1)
i.Lock()
i.counter++
i.Unlock()
for func() int {
defer i.RUnlock()
i.RLock()
return i.counter
}() > maxCalc {
time.Sleep(100 * time.Millisecond)
}
go func(filename string) {
res := openFile(filename)
if res != "" {
database.PutContentHash(res, timestamp)
}
i.Lock()
i.counter--
i.Unlock()
wgCalc.Done()
}(j.Filename)
}
wgCalc.Done()
}
// This will detect if the io errors are related with too many open files
// and suggest the user to reduce the max limit of goroutines
// Or to increase the corresponding ulimits
func handleFileOpenLimits(err error) {
var perr *fs.PathError
if errors.As(err, &perr) {
if perr.Unwrap() == syscall.EMFILE {
// log.Printf("You are running %d goroutines & have opened more files than what is allowed in your ulimits, please check", runtime.NumGoroutine())
fmt.Printf("\nEMFILE Error\n")
} else {
log.Printf("%#v", perr)
}
}
}
func (d *Database) PutContentHash(filename string, timestamp string) error {
var x = &ddbEntry{
FileName: filename,
Pk: filename,
Sk: timestamp,
}
err := d.saveItem(x)
if err != nil {
log.Fatalf("Got error calling PutContentHash: %s", err)
}
return err
}
package main
import (
"crypto/md5"
"io"
"io/fs"
"os"
"path/filepath"
"runtime"
"sync"
"syscall"
"errors"
"fmt"
"time"
"net"
"net/http"
"golang.org/x/net/http2"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"log"
)
type workerCounter struct {
sync.RWMutex
counter int
}
type RunInventoryVars struct {
PathName string
// name string
// envName string
// imageTag string
// resourceTags map[string]string
}
// Job for worker
type workerJob struct {
Root string
}
// Result of a worker
type workerResult struct {
Filename string
}
type HTTPClientSettings struct {
Connect time.Duration
ConnKeepAlive time.Duration
ExpectContinue time.Duration
IdleConn time.Duration
MaxAllIdleConns int
MaxHostIdleConns int
ResponseHeader time.Duration
TLSHandshake time.Duration
}
type Database struct {
// sync.RWMutex
sess *session.Session
svc *dynamodb.DynamoDB
table string
}
type ddbEntry struct {
FileName string `dynamodbav:"filename"`
Pk string `dynamodbav:"pk"`
Sk string `dynamodbav:"sk"`
}
type Output struct {
Count int64
Items []ddbEntry
}
func NewHTTPClientWithSettings(httpSettings HTTPClientSettings) (*http.Client, error) {
var client http.Client
tr := &http.Transport{
ResponseHeaderTimeout: httpSettings.ResponseHeader,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
KeepAlive: httpSettings.ConnKeepAlive,
DualStack: true,
Timeout: httpSettings.Connect,
}).DialContext,
MaxIdleConns: httpSettings.MaxAllIdleConns,
IdleConnTimeout: httpSettings.IdleConn,
TLSHandshakeTimeout: httpSettings.TLSHandshake,
MaxIdleConnsPerHost: httpSettings.MaxHostIdleConns,
ExpectContinueTimeout: httpSettings.ExpectContinue,
}
// So client makes HTTP/2 requests
err := http2.ConfigureTransport(tr)
if err != nil {
return &client, err
}
return &http.Client{
Transport: tr,
}, nil
}
func NewDatabaseConnection(table string) *Database {
// Initialize a session that the SDK will use to load
// credentials from the shared credentials file ~/.aws/credentials
// and region from the shared configuration file ~/.aws/config.
httpClient, err := NewHTTPClientWithSettings(HTTPClientSettings{
Connect: 5 * time.Second,
ExpectContinue: 1 * time.Second,
IdleConn: 90 * time.Second,
ConnKeepAlive: 30 * time.Second,
MaxAllIdleConns: 10, // To avoid issues with EMFILE errors when too many Idle connections are kept in MacOS
MaxHostIdleConns: 2,
ResponseHeader: 5 * time.Second,
TLSHandshake: 5 * time.Second,
})
if err != nil {
fmt.Println("Got an error creating custom HTTP client:")
fmt.Println(err)
panic("Got an error creating custom HTTP client:")
}
sess := session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
HTTPClient: httpClient,
},
SharedConfigState: session.SharedConfigEnable,
}))
// Create DynamoDB client
svc := dynamodb.New(sess)
// svc.Handlers.Build.PushFront(func(r *request.Request) {
// obs.Counter("NewRequest", 1)
// })
return &Database{
sess: sess,
svc: svc,
table: table,
}
}
func (d *Database) saveItem(item interface{}) error {
if d.table != "" {
av, err := dynamodbattribute.MarshalMap(item)
if err != nil {
log.Fatalf("Got error marshalling new item: %s", err)
}
tableName := d.table
input := &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(tableName),
}
// d.Lock()
// defer d.Unlock()
_, err = d.svc.PutItem(input)
if err != nil {
log.Fatalf("Got error calling PutItem: %s", err)
}
return err
} else {
return nil
}
}
//(map[int]*ddbEntry, error)
func main() {
RunInventory()
}
const maxCalc = 50 // TODO: Make it configurable
func RunInventory() {
var startTime = time.Now()
var db = NewDatabaseConnection("your_ddb_database")
var path = "/"
// cwd, err := os.Getwd()
dir := path
workerCount := 1
jobs := make(chan workerJob, workerCount)
results := make(chan workerResult)
readDone := make(chan bool)
calcDone := make(chan bool)
wg := &sync.WaitGroup{}
// start N workers
for i := 0; i < workerCount; i++ {
go treeWalker(jobs, results, wg)
}
// One initial job
wg.Add(1)
go func() {
jobs <- workerJob{
Root: dir,
}
}()
for i := 0; i < workerCount; i++ {
go workerCalc(results, calcDone, startTime.String(), db)
}
// When all jobs finished, shutdown the system.
go func() {
wg.Wait()
readDone <- true
}()
readloop:
for {
select {
// case res := <-results:
// // ctx.Obs.Warning(`result=%#v`, res.Filename)
// _ = res
// ctx.Obs.Warning("%v - result=%#v", md5Value, res.Filename)
case <-readDone:
// Finished traversing path
// ctx.Obs.Warning(`got stop`)
close(jobs)
close(results)
case <-calcDone:
// All existing files have been processed by calc_md5
// ctx.Obs.Warning(`got stop Calc`)
break readloop
}
}
}
// consumer of the jobs channel
// producer of workerResult
func treeWalker(jobs chan workerJob, results chan<- workerResult, wg *sync.WaitGroup) {
// While there are new jobs
for j := range jobs {
dir, err := os.Open(j.Root)
if err != nil {
handleFileOpenLimits(err)
dir.Close()
wg.Done()
continue
}
fInfo, err := dir.Readdir(-1) // Return all files in directory
dir.Close()
if err != nil {
handleFileOpenLimits(err)
wg.Done()
// if os.IsPermission(err) {
// // Skip if there's no permission
// continue
// }
// For now, skip if there is an error
continue
}
for _, file := range fInfo {
fpath, _ := filepath.Abs(filepath.Join(dir.Name(), file.Name()))
if file.Mode().IsRegular() {
// is file
fs := uint64(file.Size())
if fs == 0 {
// Skip zero sized
// wg.Done() // Not needed because if it is a file there is not a new job
continue
}
r := workerResult{
Filename: fpath,
}
results <- r
} else if file.IsDir() {
// Send directory to be processed by the worker
nj := workerJob{
Root: fpath,
}
// One more job, adds to wg
wg.Add(1)
// Do not block when sending jobs
go func() {
jobs <- nj
}()
}
}
// Done one job, let wg know.
wg.Done()
}
}
func openFile(filename string) string {
file, err := os.Open(filename)
if err != nil {
handleFileOpenLimits(err)
file.Close()
return ""
// panic(err)
}
// defer file.Close()
hash := md5.New()
_, err = io.Copy(hash, file)
if err != nil {
handleFileOpenLimits(err)
file.Close()
return ""
// panic(err)
}
file.Close()
return filename
}
func workerCalc(results chan workerResult, done chan bool, timestamp string, database *Database) {
wgCalc := &sync.WaitGroup{}
i := workerCounter{
counter: 0,
}
wgCalc.Add(1)
go func() {
wgCalc.Wait()
done <- true
}()
for j := range results {
wgCalc.Add(1)
i.Lock()
i.counter++
i.Unlock()
for func() int {
defer i.RUnlock()
i.RLock()
return i.counter
}() > maxCalc {
time.Sleep(100 * time.Millisecond)
}
go func(filename string) {
res := openFile(filename)
if res != "" {
database.PutContentHash(res, timestamp)
}
i.Lock()
i.counter--
i.Unlock()
wgCalc.Done()
}(j.Filename)
}
wgCalc.Done()
}
// This will detect if the io errors are related with too many open files
// and suggest the user to reduce the max limit of goroutines
// Or to increase the corresponding ulimits
func handleFileOpenLimits(err error) {
var perr *fs.PathError
if errors.As(err, &perr) {
if perr.Unwrap() == syscall.EMFILE {
log.Printf("You are running %d goroutines & have opened more files than what is allowed in your ulimits, please check", runtime.NumGoroutine())
} else {
log.Printf("%#v", perr)
}
}
}
func (d *Database) PutContentHash(filename string, timestamp string) error {
var x = &ddbEntry{
FileName: filename,
Pk: filename,
Sk: timestamp,
}
err := d.saveItem(x)
if err != nil {
log.Fatalf("Got error calling PutContentHash: %s", err)
}
return err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment