Skip to content

Instantly share code, notes, and snippets.

@angadn
Last active April 9, 2019 08:03
Show Gist options
  • Save angadn/d87dba86835bb3a1b440d2ee88d2c418 to your computer and use it in GitHub Desktop.
Save angadn/d87dba86835bb3a1b440d2ee88d2c418 to your computer and use it in GitHub Desktop.
Machinery using AWS SQS + DynamoDB
package main
import (
"log"
"net/http"
"sync"
"time"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/aws/aws-sdk-go/service/dynamodb"
machinery "github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
var wg sync.WaitGroup
func main() {
awsCreds := credentials.NewStaticCredentials(
"AWS KEY", // XXX: Place AWS Key here
"AWS SECRET", // XXX: Place AWS Secret here
"",
)
awsConfig := &aws.Config{
Region: aws.String("us-west-1"),
Credentials: awsCreds,
HTTPClient: &http.Client{
Timeout: time.Second * 120,
},
}
sqsClient := sqs.New(session.Must(session.NewSession(awsConfig)))
dynamoDBClient := dynamodb.New(session.Must(session.NewSession(awsConfig)))
visibilityTimeout := 20
cnf := &config.Config{
Broker: "https://sqs.us-west-1.amazonaws.com/[ID]", // XXX: Place Queue URL up till name here
DefaultQueue: "my-test-queue.fifo",
SQS: &config.SQSConfig{
Client: sqsClient,
// if VisibilityTimeout is nil default to the overall visibility timeout setting for the queue
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
VisibilityTimeout: &visibilityTimeout,
WaitTimeSeconds: 20,
},
ResultBackend: "https://dynamodb",
DynamoDB: &config.DynamoDBConfig{
Client: dynamoDBClient,
TaskStatesTable: "my-test-task-states",
GroupMetasTable: "my-test-group-metas",
},
}
server, err := machinery.NewServer(cnf)
if err != nil {
log.Fatalf("%s\n,", err.Error())
}
go func() {
worker := server.NewWorker("my-test-worker", 10)
if err = worker.Launch(); err != nil {
log.Fatalf("%s\n", err.Error())
}
}()
server.RegisterTasks(map[string]interface{}{
"log": Log,
})
const N = 10
wg.Add(N)
for n := range make([]int, N) {
if _, err := server.SendTask(&tasks.Signature{
Name: "log",
BrokerMessageGroupId: "my-test-counter",
Args: []tasks.Arg{
{
Type: "int64",
Value: n + 1,
},
},
}); err != nil {
log.Fatalf("%s\n", err.Error())
}
}
wg.Wait()
}
func Log(args ...int64) (ret int64, err error) {
log.Printf("Number %d\n", args[0])
wg.Done()
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment