Created
January 31, 2025 07:46
Redis Scaler + Key/Value Support
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalers | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"net" | |
"strconv" | |
"github.com/go-logr/logr" | |
"github.com/redis/go-redis/v9" | |
v2 "k8s.io/api/autoscaling/v2" | |
"k8s.io/metrics/pkg/apis/external_metrics" | |
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" | |
"github.com/kedacore/keda/v2/pkg/util" | |
) | |
const ( | |
defaultListLength = 5 | |
defaultActivationListLength = 0 | |
defaultDBIdx = 0 | |
defaultEnableTLS = false | |
defaultScalingFactor = 1 | |
queryTypeList = "list" | |
queryTypeValue = "value" | |
defaultQueryType = queryTypeList | |
) | |
var ( | |
ErrRedisNoListName = errors.New("no list name given") | |
ErrRedisNoAddresses = errors.New("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values") | |
ErrRedisUnequalHostsAndPorts = errors.New("not enough hosts or ports given. number of hosts should be equal to the number of ports") | |
ErrRedisInvalidQueryType = errors.New("invalid query type, must be either 'list' or 'value'") | |
ErrRedisNoKey = errors.New("no key provided for value query type") | |
) | |
type redisScaler struct { | |
metricType v2.MetricTargetType | |
metadata *redisMetadata | |
closeFn func() error | |
getValueFn func(context.Context) (int64, error) | |
logger logr.Logger | |
} | |
type redisConnectionInfo struct { | |
Addresses []string `keda:"name=address;addresses, order=triggerMetadata;authParams;resolvedEnv"` | |
Username string `keda:"name=username, order=triggerMetadata;resolvedEnv;authParams"` | |
Password string `keda:"name=password, order=triggerMetadata;resolvedEnv;authParams"` | |
SentinelUsername string `keda:"name=sentinelUsername, order=triggerMetadata;authParams;resolvedEnv"` | |
SentinelPassword string `keda:"name=sentinelPassword, order=triggerMetadata;authParams;resolvedEnv"` | |
SentinelMaster string `keda:"name=sentinelMaster, order=triggerMetadata;authParams;resolvedEnv"` | |
Hosts []string `keda:"name=host;hosts, order=triggerMetadata;resolvedEnv;authParams"` | |
Ports []string `keda:"name=port;ports, order=triggerMetadata;resolvedEnv;authParams"` | |
EnableTLS bool | |
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` | |
Cert string `keda:"name=Cert;cert, order=authParams"` | |
Key string `keda:"name=key, order=authParams"` | |
KeyPassword string `keda:"name=keyPassword, order=authParams"` | |
Ca string `keda:"name=ca, order=authParams"` | |
} | |
type redisMetadata struct { | |
ListLength int64 `keda:"name=listLength, order=triggerMetadata, default=5"` | |
ActivationListLength int64 `keda:"name=activationListLength, order=triggerMetadata, optional"` | |
ListName string `keda:"name=listName, order=triggerMetadata, optional"` | |
QueryType string `keda:"name=queryType, order=triggerMetadata, default=list"` | |
Key string `keda:"name=key, order=triggerMetadata, optional"` | |
ScalingFactor int64 `keda:"name=scalingFactor, order=triggerMetadata, default=1"` | |
DatabaseIndex int `keda:"name=databaseIndex, order=triggerMetadata, optional"` | |
MetadataEnableTLS string `keda:"name=enableTLS, order=triggerMetadata, optional"` | |
AuthParamEnableTLS string `keda:"name=tls, order=authParams, optional"` | |
ConnectionInfo redisConnectionInfo `keda:"optional"` | |
triggerIndex int | |
} | |
func (r *redisMetadata) Validate() error { | |
if err := validateRedisAddress(&r.ConnectionInfo); err != nil { | |
return err | |
} | |
if err := r.ConnectionInfo.SetEnableTLS(r.MetadataEnableTLS, r.AuthParamEnableTLS); err != nil { | |
return err | |
} | |
r.MetadataEnableTLS, r.AuthParamEnableTLS = "", "" | |
if r.QueryType != queryTypeList && r.QueryType != queryTypeValue { | |
return ErrRedisInvalidQueryType | |
} | |
if r.QueryType == queryTypeList && r.ListName == "" { | |
return ErrRedisNoListName | |
} | |
if r.QueryType == queryTypeValue && r.Key == "" { | |
return ErrRedisNoKey | |
} | |
if r.ScalingFactor <= 0 { | |
r.ScalingFactor = defaultScalingFactor | |
} | |
return nil | |
} | |
func NewRedisScaler(ctx context.Context, isClustered, isSentinel bool, config *scalersconfig.ScalerConfig) (Scaler, error) { | |
metricType, err := GetMetricTargetType(config) | |
if err != nil { | |
return nil, fmt.Errorf("error getting scaler metric type: %w", err) | |
} | |
logger := InitializeLogger(config, "redis_scaler") | |
meta, err := parseRedisMetadata(config) | |
if err != nil { | |
return nil, fmt.Errorf("error parsing redis metadata: %w", err) | |
} | |
// Choose appropriate Lua script based on query type | |
var script string | |
if meta.QueryType == queryTypeValue { | |
script = ` | |
local key = KEYS[1] | |
local value = redis.call('get', key) | |
if value then | |
local number = tonumber(value) | |
if number then | |
return number | |
end | |
end | |
return 0 | |
` | |
} else { | |
script = ` | |
local listName = KEYS[1] | |
local listType = redis.call('type', listName).ok | |
local cmd = { | |
zset = 'zcard', | |
set = 'scard', | |
list = 'llen', | |
hash = 'hlen', | |
none = 'llen' | |
} | |
return redis.call(cmd[listType], listName) | |
` | |
} | |
if isClustered { | |
return createClusteredRedisScaler(ctx, meta, script, metricType, logger) | |
} else if isSentinel { | |
return createSentinelRedisScaler(ctx, meta, script, metricType, logger) | |
} | |
return createRedisScaler(ctx, meta, script, metricType, logger) | |
} | |
func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script string, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) { | |
client, err := getRedisClusterClient(ctx, meta.ConnectionInfo) | |
if err != nil { | |
return nil, fmt.Errorf("connection to redis cluster failed: %w", err) | |
} | |
closeFn := func() error { | |
if err := client.Close(); err != nil { | |
logger.Error(err, "error closing redis client") | |
return err | |
} | |
return nil | |
} | |
getValueFn := func(ctx context.Context) (int64, error) { | |
key := meta.ListName | |
if meta.QueryType == queryTypeValue { | |
key = meta.Key | |
} | |
cmd := client.Eval(ctx, script, []string{key}) | |
if cmd.Err() != nil { | |
return -1, cmd.Err() | |
} | |
return cmd.Int64() | |
} | |
return &redisScaler{ | |
metricType: metricType, | |
metadata: meta, | |
closeFn: closeFn, | |
getValueFn: getValueFn, | |
logger: logger, | |
}, nil | |
} | |
func (s *redisScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { | |
var metricName string | |
if s.metadata.QueryType == queryTypeValue { | |
metricName = util.NormalizeString(fmt.Sprintf("redis-value-%s", s.metadata.Key)) | |
} else { | |
metricName = util.NormalizeString(fmt.Sprintf("redis-%s", s.metadata.ListName)) | |
} | |
externalMetric := &v2.ExternalMetricSource{ | |
Metric: v2.MetricIdentifier{ | |
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName), | |
}, | |
Target: GetMetricTarget(s.metricType, s.metadata.ListLength), | |
} | |
metricSpec := v2.MetricSpec{ | |
External: externalMetric, | |
Type: externalMetricType, | |
} | |
return []v2.MetricSpec{metricSpec} | |
} | |
func (s *redisScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { | |
value, err := s.getValueFn(ctx) | |
if err != nil { | |
s.logger.Error(err, "error getting redis value") | |
return []external_metrics.ExternalMetricValue{}, false, err | |
} | |
var scaledValue float64 | |
if s.metadata.QueryType == queryTypeValue { | |
scaledValue = float64(value) / float64(s.metadata.ScalingFactor) | |
} else { | |
scaledValue = float64(value) | |
} | |
metric := GenerateMetricInMili(metricName, scaledValue) | |
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationListLength, nil | |
} | |
func (s *redisScaler) Close(context.Context) error { | |
return s.closeFn() | |
} | |
// Helper functions (getRedisClient, getRedisSentinelClient, etc.) remain the same... | |
func parseRedisMetadata(config *scalersconfig.ScalerConfig) (*redisMetadata, error) { | |
meta := &redisMetadata{} | |
if err := config.TypedConfig(meta); err != nil { | |
return nil, fmt.Errorf("error parsing redis metadata: %w", err) | |
} | |
meta.triggerIndex = config.TriggerIndex | |
return meta, nil | |
} | |
// Connection validation and client creation functions remain the same... |
Looks good to me. I think we should also add rate limiting and cooldown period to avoid the unnecessary scaling up/down
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
YAML: