Skip to content

Instantly share code, notes, and snippets.

@Binternet
Created January 31, 2025 07:46
Redis Scaler + Key/Value Support
package scalers
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"github.com/go-logr/logr"
"github.com/redis/go-redis/v9"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
"github.com/kedacore/keda/v2/pkg/util"
)
const (
defaultListLength = 5
defaultActivationListLength = 0
defaultDBIdx = 0
defaultEnableTLS = false
defaultScalingFactor = 1
queryTypeList = "list"
queryTypeValue = "value"
defaultQueryType = queryTypeList
)
var (
ErrRedisNoListName = errors.New("no list name given")
ErrRedisNoAddresses = errors.New("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values")
ErrRedisUnequalHostsAndPorts = errors.New("not enough hosts or ports given. number of hosts should be equal to the number of ports")
ErrRedisInvalidQueryType = errors.New("invalid query type, must be either 'list' or 'value'")
ErrRedisNoKey = errors.New("no key provided for value query type")
)
type redisScaler struct {
metricType v2.MetricTargetType
metadata *redisMetadata
closeFn func() error
getValueFn func(context.Context) (int64, error)
logger logr.Logger
}
type redisConnectionInfo struct {
Addresses []string `keda:"name=address;addresses, order=triggerMetadata;authParams;resolvedEnv"`
Username string `keda:"name=username, order=triggerMetadata;resolvedEnv;authParams"`
Password string `keda:"name=password, order=triggerMetadata;resolvedEnv;authParams"`
SentinelUsername string `keda:"name=sentinelUsername, order=triggerMetadata;authParams;resolvedEnv"`
SentinelPassword string `keda:"name=sentinelPassword, order=triggerMetadata;authParams;resolvedEnv"`
SentinelMaster string `keda:"name=sentinelMaster, order=triggerMetadata;authParams;resolvedEnv"`
Hosts []string `keda:"name=host;hosts, order=triggerMetadata;resolvedEnv;authParams"`
Ports []string `keda:"name=port;ports, order=triggerMetadata;resolvedEnv;authParams"`
EnableTLS bool
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
Cert string `keda:"name=Cert;cert, order=authParams"`
Key string `keda:"name=key, order=authParams"`
KeyPassword string `keda:"name=keyPassword, order=authParams"`
Ca string `keda:"name=ca, order=authParams"`
}
type redisMetadata struct {
ListLength int64 `keda:"name=listLength, order=triggerMetadata, default=5"`
ActivationListLength int64 `keda:"name=activationListLength, order=triggerMetadata, optional"`
ListName string `keda:"name=listName, order=triggerMetadata, optional"`
QueryType string `keda:"name=queryType, order=triggerMetadata, default=list"`
Key string `keda:"name=key, order=triggerMetadata, optional"`
ScalingFactor int64 `keda:"name=scalingFactor, order=triggerMetadata, default=1"`
DatabaseIndex int `keda:"name=databaseIndex, order=triggerMetadata, optional"`
MetadataEnableTLS string `keda:"name=enableTLS, order=triggerMetadata, optional"`
AuthParamEnableTLS string `keda:"name=tls, order=authParams, optional"`
ConnectionInfo redisConnectionInfo `keda:"optional"`
triggerIndex int
}
func (r *redisMetadata) Validate() error {
if err := validateRedisAddress(&r.ConnectionInfo); err != nil {
return err
}
if err := r.ConnectionInfo.SetEnableTLS(r.MetadataEnableTLS, r.AuthParamEnableTLS); err != nil {
return err
}
r.MetadataEnableTLS, r.AuthParamEnableTLS = "", ""
if r.QueryType != queryTypeList && r.QueryType != queryTypeValue {
return ErrRedisInvalidQueryType
}
if r.QueryType == queryTypeList && r.ListName == "" {
return ErrRedisNoListName
}
if r.QueryType == queryTypeValue && r.Key == "" {
return ErrRedisNoKey
}
if r.ScalingFactor <= 0 {
r.ScalingFactor = defaultScalingFactor
}
return nil
}
func NewRedisScaler(ctx context.Context, isClustered, isSentinel bool, config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}
logger := InitializeLogger(config, "redis_scaler")
meta, err := parseRedisMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %w", err)
}
// Choose appropriate Lua script based on query type
var script string
if meta.QueryType == queryTypeValue {
script = `
local key = KEYS[1]
local value = redis.call('get', key)
if value then
local number = tonumber(value)
if number then
return number
end
end
return 0
`
} else {
script = `
local listName = KEYS[1]
local listType = redis.call('type', listName).ok
local cmd = {
zset = 'zcard',
set = 'scard',
list = 'llen',
hash = 'hlen',
none = 'llen'
}
return redis.call(cmd[listType], listName)
`
}
if isClustered {
return createClusteredRedisScaler(ctx, meta, script, metricType, logger)
} else if isSentinel {
return createSentinelRedisScaler(ctx, meta, script, metricType, logger)
}
return createRedisScaler(ctx, meta, script, metricType, logger)
}
func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script string, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
client, err := getRedisClusterClient(ctx, meta.ConnectionInfo)
if err != nil {
return nil, fmt.Errorf("connection to redis cluster failed: %w", err)
}
closeFn := func() error {
if err := client.Close(); err != nil {
logger.Error(err, "error closing redis client")
return err
}
return nil
}
getValueFn := func(ctx context.Context) (int64, error) {
key := meta.ListName
if meta.QueryType == queryTypeValue {
key = meta.Key
}
cmd := client.Eval(ctx, script, []string{key})
if cmd.Err() != nil {
return -1, cmd.Err()
}
return cmd.Int64()
}
return &redisScaler{
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getValueFn: getValueFn,
logger: logger,
}, nil
}
func (s *redisScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var metricName string
if s.metadata.QueryType == queryTypeValue {
metricName = util.NormalizeString(fmt.Sprintf("redis-value-%s", s.metadata.Key))
} else {
metricName = util.NormalizeString(fmt.Sprintf("redis-%s", s.metadata.ListName))
}
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.ListLength),
}
metricSpec := v2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}
return []v2.MetricSpec{metricSpec}
}
func (s *redisScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
value, err := s.getValueFn(ctx)
if err != nil {
s.logger.Error(err, "error getting redis value")
return []external_metrics.ExternalMetricValue{}, false, err
}
var scaledValue float64
if s.metadata.QueryType == queryTypeValue {
scaledValue = float64(value) / float64(s.metadata.ScalingFactor)
} else {
scaledValue = float64(value)
}
metric := GenerateMetricInMili(metricName, scaledValue)
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationListLength, nil
}
func (s *redisScaler) Close(context.Context) error {
return s.closeFn()
}
// Helper functions (getRedisClient, getRedisSentinelClient, etc.) remain the same...
func parseRedisMetadata(config *scalersconfig.ScalerConfig) (*redisMetadata, error) {
meta := &redisMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %w", err)
}
meta.triggerIndex = config.TriggerIndex
return meta, nil
}
// Connection validation and client creation functions remain the same...
@Binternet
Copy link
Author

YAML:

triggers:
  - type: redis
    metadata:
      queryType: "value"
      key: "pending-tasks"
      scalingFactor: "100"  # One replica per 100 tasks

@vinod827
Copy link

vinod827 commented Feb 9, 2025

Looks good to me. I think we should also add rate limiting and cooldown period to avoid the unnecessary scaling up/down

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment