Created
February 26, 2024 12:19
-
-
Save iyashjayesh/1752fb86bb419144168c896a4dd599a5 to your computer and use it in GitHub Desktop.
[Golang-AWS(SQS)] Code to receive and delete messages from SQS as soon as it's processed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"time" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/credentials" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/sqs" | |
) | |
type SessionWrapper struct { | |
Session *session.Session | |
} | |
func NewSession(region string, info map[string]string) *SessionWrapper { | |
sess := session.Must(session.NewSession( | |
&aws.Config{ | |
Credentials: credentials.NewStaticCredentials(info["Access Key"], info["Secret Access Key"], ""), | |
Region: aws.String(region), | |
}, | |
)) | |
return &SessionWrapper{ | |
Session: sess, | |
} | |
} | |
func main() { | |
var info = map[string]string{ | |
"Access Key": "", | |
"Secret Access Key": "", | |
} | |
region := "" // ex. ap-south-1 | |
awsSession := NewSession(region, info) | |
svc := sqs.New(awsSession.Session) | |
for { | |
MessageList, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ | |
QueueUrl: aws.String(""), // QueueUrl | |
// The maximum number of messages to return. Amazon SQS never returns more messages | |
// than this value (however, fewer messages might be returned). Valid values: | |
// 1 to 10. Default: 1. | |
MaxNumberOfMessages: aws.Int64(10), | |
// The duration (in seconds) for which the call waits for a message to arrive | |
// in the queue before returning. If a message is available, the call returns | |
// sooner than WaitTimeSeconds. If no messages are available and the wait time | |
// expires, the call returns successfully with an empty list of messages. | |
WaitTimeSeconds: aws.Int64(10), | |
// The duration (in seconds) that the received messages are hidden from subsequent | |
// retrieve requests after being retrieved by a ReceiveMessage request. | |
VisibilityTimeout: aws.Int64(10), | |
}) | |
if err != nil { | |
fmt.Println("Error", err) | |
continue | |
} | |
if len(MessageList.Messages) == 0 { | |
fmt.Println("No messages to process") | |
time.Sleep(5 * time.Second) // Sleep for 5 seconds before polling again | |
continue | |
} | |
for _, message := range MessageList.Messages { | |
LoadToFile(*message.Body) | |
DeleteMessage, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ | |
QueueUrl: aws.String(""), //QueueUrl | |
ReceiptHandle: message.ReceiptHandle, | |
}) | |
if err != nil { | |
fmt.Println("Delete Error", err) | |
} | |
fmt.Println("Message Deleted", DeleteMessage) | |
} | |
fmt.Println("Total Messages: ", len(MessageList.Messages)) | |
time.Sleep(5 * time.Second) // Sleep for 5 seconds before polling again | |
fmt.Println("Making a new request") | |
} | |
} | |
func LoadToFile(message string) { | |
file := "log.json" | |
if _, err := os.Stat(file); os.IsNotExist(err) { | |
_, err := os.Create(file) | |
if err != nil { | |
fmt.Println("Error: ", err) | |
} | |
} | |
f, err := os.OpenFile(file, os.O_APPEND|os.O_WRONLY, 0644) | |
if err != nil { | |
fmt.Println("Error: ", err) | |
} | |
defer f.Close() | |
if _, err = f.WriteString(message + "\n"); err != nil { | |
fmt.Println("Error: ", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment