Skip to content

Instantly share code, notes, and snippets.

@LucasBadico
Created December 20, 2023 01:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LucasBadico/364006e4284f07b4fed36e30dcc8e37a to your computer and use it in GitHub Desktop.
Save LucasBadico/364006e4284f07b4fed36e30dcc8e37a to your computer and use it in GitHub Desktop.
GOLANG CONCURRENCY MODEL EXAMPLE
// CODE EXAMPLE FROM YOUTUBE LIVE: Golang: Modelo de concorrencia goroutine + channels
// LINK: https://youtube.com/live/TRx3W4mLFf8
// Lucas_Badico: Dev, mentor e apaixonado por Programacao
// SIGA ME EM TODAS AS REDES SOCIAIS
package main
import (
"time"
"fmt"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
// "os"
)
type Item struct {
PK string `json:"PK"`
SK string `json:"SK"`
// Add other attributes here.
}
func buildBatchDeleteInput(items []Item, tableName string) ([]*dynamodb.BatchWriteItemInput, error) {
var batchInputs []*dynamodb.BatchWriteItemInput
const batchSize = 25 // Max items allowed in a single batch write request
// Calculate the number of batches required
totalItems := len(items)
numBatches := (totalItems + batchSize - 1) / batchSize
// Create and populate batch write requests
for i := 0; i < numBatches; i++ {
start := i * batchSize
end := (i + 1) * batchSize
if end > totalItems {
end = totalItems
}
batchItems := items[start:end]
deleteRequests := []*dynamodb.WriteRequest{}
for _, item := range batchItems {
// Extract the partition key and sort key values from the item.
partitionKey := item.PK
sortKey := item.SK
// Create a DeleteRequest for the item.
deleteRequest := &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
"PK": {
S: aws.String(partitionKey),
},
"SK": {
S: aws.String(sortKey),
},
},
},
}
// Append the DeleteRequest to the list.
deleteRequests = append(deleteRequests, deleteRequest)
}
// Create the BatchWriteItemInput with the DeleteRequests for this batch.
input := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
tableName: deleteRequests,
},
}
batchInputs = append(batchInputs, input)
}
return batchInputs, nil
}
func scanWithLastEvaluatedKey(svc *dynamodb.DynamoDB, tableName string, lastEvaluatedKey map[string]*dynamodb.AttributeValue) ([]Item, map[string]*dynamodb.AttributeValue, error) {
var allItems []Item
// Prepare the ScanInput with the exclusive start key for pagination.
scanInput := &dynamodb.ScanInput{
TableName: aws.String(tableName),
// Limit: aws.Int64(3),
}
if lastEvaluatedKey != nil {
scanInput.ExclusiveStartKey = lastEvaluatedKey
}
// Perform the DynamoDB Scan operation.
scanResult, err := svc.Scan(scanInput)
if err != nil {
return nil, nil, err
}
// Extract items from the scan result and append them to the allItems slice.
for _, item := range scanResult.Items {
// Dereference the pointers (*string) to get the actual string values.
pk := *item["PK"].S
sk := *item["SK"].S
allItems = append(allItems, Item{
PK: pk,
SK: sk,
// Extract other attributes as needed.
})
}
// Return the scanned items and the new last evaluated key.
return allItems, scanResult.LastEvaluatedKey, nil
}
func deleteBatch(svc *dynamodb.DynamoDB, input *dynamodb.BatchWriteItemInput, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("DELETING...")
_, err := svc.BatchWriteItem(input)
if err != nil {
// Handle the error.
fmt.Println("ERROR:", err)
return
}
fmt.Println("DELETED!")
}
func process(svc *dynamodb.DynamoDB, tableName string) {
var lastEvaluatedKey map[string]*dynamodb.AttributeValue // Change the map type based on your actual data type
for {
// Call your function
fmt.Println("READING...")
// Call the scanWithLastEvaluatedKey function to get the scan items and the new last evaluated key.
items, newLastEvaluatedKey, err := scanWithLastEvaluatedKey(svc, tableName, lastEvaluatedKey)
if err != nil {
// Handle the error.
return
}
fmt.Println("READ")
// Build the BatchWriteItemInput with DeleteRequests using the items from the scan result.
batchInputs, err := buildBatchDeleteInput(items, tableName)
if err != nil {
// Handle the error.
return
}
var wg sync.WaitGroup
for _, input := range batchInputs {
wg.Add(1)
go deleteBatch(svc, input, &wg)
}
// Wait for all goroutines to finish
wg.Wait()
// Process the scanned items as needed.
// ...
// Update the last evaluated key for the next iteration.
lastEvaluatedKey = newLastEvaluatedKey
// Check if there are more items to scan.
if newLastEvaluatedKey == nil {
fmt.Println("FINISHED")
// No more items to scan, exit the loop.
break
}
}
}
func repeatAtEachHour(fn func(), duration time.Duration) {
for {
// Get the current time
now := time.Now()
// Calculate the duration until the next hour
nextHour := now.Truncate(time.Hour).Add(time.Hour)
durationUntilNextHour := nextHour.Sub(now)
// Sleep until the next hour
time.Sleep(durationUntilNextHour)
// Call the provided function
fn()
}
}
func main() {
// Create a new session using the AWS SDK for Go.
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
// Create a new DynamoDB service client.
svc := dynamodb.New(sess)
// Specify the table name from which you want to delete items.
tableName :="cache" //os.Getenv("DICT_CACHE_TABLE")
// Start the process function in a goroutine
go process(svc, tableName)
// Call the repeatAtEachHour function to repeat the process at each hour
repeatAtEachHour(func() {
process(svc, tableName)
}, time.Hour)
}
// CODE EXAMPLE FROM YOUTUBE LIVE: Golang: Modelo de concorrencia goroutine + channels
// LINK: https://youtube.com/live/TRx3W4mLFf8
// Lucas_Badico: Dev, mentor e apaixonado por Programacao
// SIGA ME EM TODAS AS REDES SOCIAIS
package main
import (
"fmt"
"time"
"sync"
// "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/google/uuid"
)
type Item struct {
PK string `json:"PK"`
SK string `json:"SK"`
Timestamp string `json:"Timestamp"`
}
func runIt(svc *dynamodb.DynamoDB, ch chan int, wg *sync.WaitGroup) {
// Inicializa um slice para armazenar os itens
var items []*dynamodb.WriteRequest
itemCount := 0
tableName := "cache"
for i := 0; i < 25; i++ {
pk := uuid.New().String()
sk := uuid.New().String()
item := Item{
PK: pk,
SK: sk,
Timestamp: time.Now().Format(time.RFC3339),
}
av, err := dynamodbattribute.MarshalMap(item)
if err != nil {
fmt.Println("Got error marshalling map:")
fmt.Println(err.Error())
return
}
items = append(items, &dynamodb.WriteRequest{
PutRequest: &dynamodb.PutRequest{
Item: av,
},
})
itemCount++
}
batchInput := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
tableName: items,
},
}
_, err := svc.BatchWriteItem(batchInput)
if err != nil {
fmt.Println("Got error calling BatchWriteItem:")
fmt.Println(err.Error())
return
}
ch <- itemCount
time.Sleep(1 * time.Second)
wg.Done()
}
func finshEachRun(ch chan int) {
itemCount := 0
runCount := 0
for {
select {
case newItemsCount, ok := <- ch:
if !ok {
fmt.Println("Channel closed!")
return
}
fmt.Println("Received:", newItemsCount)
itemCount = itemCount + newItemsCount
runCount++
fmt.Printf("Total items written: %d\n", itemCount)
fmt.Printf("Total runs: %d\n", runCount)
}
}
}
func main() {
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
svc := dynamodb.New(sess)
ch := make(chan int)
go finshEachRun(ch)
// Substitua pelo nome da sua tabela
for {
//
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go runIt(svc, ch, &wg)
}
// Wait for all goroutines to finish
// wg.Add(1)
// go runIt(svc, ch, &wg)
wg.Wait()
// Espera um pouco antes de inserir o próximo lote
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment