Skip to content

Instantly share code, notes, and snippets.

@halfvector
Created August 3, 2020 19:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save halfvector/91b99d54688d2366a71037765272c9af to your computer and use it in GitHub Desktop.
Save halfvector/91b99d54688d2366a71037765272c9af to your computer and use it in GitHub Desktop.
Using aws-sdk-go to pickup file changes from S3 via S3->SNS->SQS
package main
import (
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sqs"
"io/ioutil"
"time"
)
func main() {
sess, err := session.NewSessionWithOptions(session.Options{
Profile: "readonly-sqs-account", // profile from ~/.aws/credentials that defines aws_access_key_id and aws_secret_access_key
Config: aws.Config{
Region: aws.String("us-west-2"), // region of the queue
},
})
if err != nil {
panic(err)
}
sqsSvc := sqs.New(sess)
s3Svc := s3.New(sess)
url, err := sqsSvc.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String("sqs-queue-name")})
if err != nil {
panic(err)
}
fmt.Printf("queue url: %v\n", *url.QueueUrl)
for i := 0; i < 300; i++ {
message, err := sqsSvc.ReceiveMessage(&sqs.ReceiveMessageInput{
// some attributes could be helpful for debugging, worth exploring them
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: url.QueueUrl,
MaxNumberOfMessages: aws.Int64(10), // get the max batch size
VisibilityTimeout: aws.Int64(15), // how long to keep this message away from any other consumers
WaitTimeSeconds: aws.Int64(10), // long polling, wait this long for new messages to arrive.
})
if err != nil {
panic(err)
}
for _, msg := range message.Messages {
// unpack sns message envelope that is in the queue
var snsMsg SnsMessage
err := json.Unmarshal([]byte(*msg.Body), &snsMsg)
if err != nil {
panic(err)
}
// unpack the s3 event inside the sns envelope
var s3event S3Event
err = json.Unmarshal([]byte(snsMsg.Message), &s3event)
if err != nil {
panic(err)
}
// each s3 event can have multiple effected s3 records with various actions
for _, record := range s3event.Records {
fmt.Printf("%v - %v - %v - %.0f seconds ago\n", record.S3.Bucket.Name, record.EventName, record.S3.Object.Key, time.Since(record.EventTime).Seconds())
// for some events, we might want to read the file (ie if its an xml file)
if record.EventName == "ObjectCreated:Put" {
object, err := s3Svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(record.S3.Bucket.Name),
Key: aws.String(record.S3.Object.Key),
})
if err != nil {
panic(err)
}
payload, err := ioutil.ReadAll(object.Body)
if err != nil {
panic(err)
}
fmt.Printf("file contents: %v\n", string(payload))
}
}
// the SQS way of acking a message is to delete it
_, err = sqsSvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: url.QueueUrl,
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
panic(err)
}
}
}
}
type SnsMessage struct {
Type string `json:"Type"`
MessageID string `json:"MessageId"`
TopicArn string `json:"TopicArn"`
Subject string `json:"Subject"`
Message string `json:"Message"` // will be s3 event
Timestamp time.Time `json:"Timestamp"`
SignatureVersion string `json:"SignatureVersion"`
Signature string `json:"Signature"`
SigningCertURL string `json:"SigningCertURL"`
UnsubscribeURL string `json:"UnsubscribeURL"`
}
type S3Event struct {
Records []struct {
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"`
AwsRegion string `json:"awsRegion"`
EventTime time.Time `json:"eventTime"`
EventName string `json:"eventName"`
UserIdentity struct {
PrincipalID string `json:"principalId"`
} `json:"userIdentity"`
RequestParameters struct {
SourceIPAddress string `json:"sourceIPAddress"`
} `json:"requestParameters"`
ResponseElements struct {
XAmzRequestID string `json:"x-amz-request-id"`
XAmzID2 string `json:"x-amz-id-2"`
} `json:"responseElements"`
S3 struct {
S3SchemaVersion string `json:"s3SchemaVersion"`
ConfigurationID string `json:"configurationId"`
Bucket struct {
Name string `json:"name"`
OwnerIdentity struct {
PrincipalID string `json:"principalId"`
} `json:"ownerIdentity"`
Arn string `json:"arn"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
Size int `json:"size"`
ETag string `json:"eTag"`
Sequencer string `json:"sequencer"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment