Skip to content

Instantly share code, notes, and snippets.

@naesheim
Last active August 29, 2023 13:38
Show Gist options
  • Save naesheim/fa824d82d2da9d06032f047a88970d66 to your computer and use it in GitHub Desktop.
Save naesheim/fa824d82d2da9d06032f047a88970d66 to your computer and use it in GitHub Desktop.
When you create a cloud build and select pubsub as a trigger, there will be created a subscription on the assigned topic. (Usually the topic of an artifact registry) The problem is that this subscription has an expiry ttl of 31 days. After 31 days of inactivity, it will be deleted. This script iterates over projects in a given folder. It then it…
package main
import (
"context"
"log/slog"
"os"
"strings"
"sync"
pubsub "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
resourcemanager "google.golang.org/api/cloudresourcemanager/v3"
"google.golang.org/api/iterator"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)
type pubSubUpdate struct {
pubClient *pubsub.PublisherClient
subClient *pubsub.SubscriberClient
wg *sync.WaitGroup
}
type pubSubMessage struct {
Data []byte `json:"data"`
}
var log = slog.New(slog.NewJSONHandler(os.Stderr, nil))
func createFolderList(org string, service *resourcemanager.Service) ([]string, error) {
var folders = []string{}
var folderTrav func(folder string) error
folderTrav = func(folder string) error {
folderList, err := service.Folders.List().Parent(folder).Do()
if err != nil {
return err
}
folders = append(folders, folder)
if len(folderList.Folders) > 0 {
for _, folder := range folderList.Folders {
err := folderTrav(folder.Name)
if err != nil {
return err
}
}
}
return nil
}
err := folderTrav(org)
if err != nil {
return nil, err
}
return folders, nil
}
func (ps pubSubUpdate) updateSubscription(ctx context.Context, project string) {
defer ps.wg.Done()
ltr := &pubsubpb.ListTopicsRequest{
Project: project,
}
// list topics in given project
topicIt := ps.pubClient.ListTopics(ctx, ltr)
for {
topicName, err := topicIt.Next()
if err == iterator.Done {
break
}
if err != nil {
log.With("project",project).Error(err.Error())
break
}
// List subscriptions in given topic
req := &pubsubpb.ListTopicSubscriptionsRequest{
Topic: topicName.Name,
}
subIt := ps.pubClient.ListTopicSubscriptions(ctx, req)
for {
subName, err := subIt.Next()
if err == iterator.Done {
break
}
if err != nil {
log.With("topic",topicName.Name).Error(err.Error())
break
}
if strings.Contains(subName, "/gcb-") {
resp, err := ps.subClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: subName})
if err != nil {
log.Error("whoops - failed to retrieve subscription")
continue
}
if resp.ExpirationPolicy.Ttl != nil {
go func() {
// create update request with empty expiration policy
updatereq := &pubsubpb.UpdateSubscriptionRequest{
Subscription: &pubsubpb.Subscription{
Name: subName,
ExpirationPolicy: &pubsubpb.ExpirationPolicy{},
},
UpdateMask: &fieldmaskpb.FieldMask{
Paths: []string{"expiration_policy"},
},
}
// update subscription
resp, err := ps.subClient.UpdateSubscription(ctx, updatereq)
if err != nil || resp == nil {
log.With("subscription", subName).Error("failed to update")
}
log.Info("updated:", resp.GetName())
}()
}
}
}
}
}
func main() {
folderID := os.Getenv("FOLDER")
if folderID == "" {
log.Error("empty root folder")
}
if !strings.HasPrefix(folderID, "folders/"){
folderID = "folders/"+folderID
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pubClient, err := pubsub.NewPublisherClient(ctx)
if err != nil {
log.Error(err.Error())
}
defer pubClient.Close()
subClient, err := pubsub.NewSubscriberClient(ctx)
if err != nil {
log.Error(err.Error())
}
defer subClient.Close()
psUpdate := pubSubUpdate{
pubClient: pubClient,
subClient: subClient,
wg: &sync.WaitGroup{},
}
service, err := resourcemanager.NewService(ctx)
if err != nil {
log.Error(err.Error())
}
folders, err := createFolderList(folderID, service)
if err != nil {
log.Error(err.Error())
}
// iterate over folders
for _, folder := range folders {
projectList, err := service.Projects.List().Parent(folder).Do()
if err != nil {
log.Error(err.Error())
}
// iterate over projects in given folder
for _, project := range projectList.Projects {
// update subscriptions in project
psUpdate.wg.Add(1)
go psUpdate.updateSubscription(ctx, project.Name)
}
}
psUpdate.wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment