Skip to content

Instantly share code, notes, and snippets.

@aavarghese
Last active March 21, 2022 20:27
Show Gist options
  • Save aavarghese/377cac750e45ba32cdda528f40b56343 to your computer and use it in GitHub Desktop.
Save aavarghese/377cac750e45ba32cdda528f40b56343 to your computer and use it in GitHub Desktop.
Changes needed to activate new scaling & scheduling kafka source consumers code in eventing-kafka-broker
diff --git a/control-plane/cmd/kafka-controller/main.go b/control-plane/cmd/kafka-controller/main.go
index 5447ae63..13b8c84c 100644
--- a/control-plane/cmd/kafka-controller/main.go
+++ b/control-plane/cmd/kafka-controller/main.go
@@ -29,10 +29,12 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel"
+ "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumer"
+ "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/sink"
- "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/source"
+ sourcev2 "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/source/v2"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger"
)
const (
@@ -58,7 +60,12 @@ func main() {
sourceEnv, err := config.GetEnvConfig("SOURCE")
if err != nil {
- log.Fatal("cannot process environment variables with prefix SINK", err)
+ log.Fatal("cannot process environment variables with prefix SOURCE", err)
+ }
+
+ consumerEnv, err := config.GetEnvConfig("SOURCE")
+ if err != nil {
+ log.Fatal("cannot process environment variables with prefix CONSUMER", err)
}
sharedmain.MainNamed(signals.NewContext(), component,
@@ -75,7 +82,7 @@ func main() {
injection.NamedControllerConstructor{
Name: "trigger-controller",
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
return trigger.NewController(ctx, watcher, brokerEnv)
},
},
@@ -83,7 +90,7 @@ func main() {
injection.NamedControllerConstructor{
Name: "channel-controller",
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
return channel.NewController(ctx, watcher, channelEnv)
},
},
@@ -99,7 +106,23 @@ func main() {
injection.NamedControllerConstructor{
Name: "source-controller",
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
- return source.NewController(ctx, watcher, sourceEnv)
+ return sourcev2.NewController(ctx, sourceEnv)
+ },
+ },
+
+ // ConsumerGroup controller
+ injection.NamedControllerConstructor{
+ Name: "consumergroup-controller",
+ ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
+ return consumergroup.NewController(ctx)
+ },
+ },
+
+ // Consumer controller
+ injection.NamedControllerConstructor{
+ Name: "consumer-controller",
+ ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
+ return consumer.NewController(ctx, consumerEnv)
},
},
)
diff --git a/data-plane/config/source/500-dispatcher.yaml b/data-plane/config/source/500-dispatcher.yaml
index 3be219bf..9866ab86 100644
--- a/data-plane/config/source/500-dispatcher.yaml
+++ b/data-plane/config/source/500-dispatcher.yaml
@@ -15,7 +15,7 @@
# limitations under the License.
apiVersion: apps/v1
-kind: Deployment
+kind: StatefulSet
metadata:
name: kafka-source-dispatcher
namespace: knative-eventing
@@ -23,6 +23,8 @@ metadata:
app: kafka-source-dispatcher
kafka.eventing.knative.dev/release: devel
spec:
+ serviceName: kafka-source-dispatcher
+ podManagementPolicy: "Parallel"
selector:
matchLabels:
app: kafka-source-dispatcher
@@ -31,6 +33,7 @@ spec:
name: kafka-source-dispatcher
labels:
app: kafka-source-dispatcher
+ app.kubernetes.io/component: kafka-dispatcher
kafka.eventing.knative.dev/release: devel
spec:
serviceAccountName: knative-kafka-source-data-plane
@@ -44,8 +47,8 @@ spec:
- mountPath: /etc/config
name: config-kafka-source-data-plane
readOnly: true
- - mountPath: /etc/sources
- name: kafka-source-sources
+ - mountPath: /etc/contract-resources
+ name: contract-resources
readOnly: true
- mountPath: /tmp
name: cache
@@ -73,7 +76,7 @@ spec:
- name: WEBCLIENT_CONFIG_FILE_PATH
value: /etc/config/config-kafka-source-webclient.properties
- name: DATA_PLANE_CONFIG_FILE_PATH
- value: /etc/sources/data
+ value: /etc/contract-resources/data
- name: EGRESSES_INITIAL_CAPACITY
value: "20"
- name: INSTANCE_ID
@@ -102,7 +105,16 @@ spec:
- "-Dlogback.configurationFile=/etc/logging/config.xml"
- "-jar"
- "/app/app.jar"
- # TODO set resources (limits and requests)
+
+ resources:
+ requests:
+ cpu: 1000m
+ # 600Mi for virtual replicas + 100Mi overhead
+ memory: 700Mi
+ limits:
+ cpu: 2000m
+ memory: 1000Mi
+
livenessProbe:
failureThreshold: 3
httpGet:
@@ -133,9 +145,6 @@ spec:
- name: config-kafka-source-data-plane
configMap:
name: config-kafka-source-data-plane
- - name: kafka-source-sources
- configMap:
- name: kafka-source-sources
- name: cache
emptyDir: { }
- name: kafka-config-logging
@aavarghese
Copy link
Author

cc: @aslom @lionelvillard for activating new scaling support in ekb

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment