Last active
March 21, 2022 20:27
-
-
Save aavarghese/377cac750e45ba32cdda528f40b56343 to your computer and use it in GitHub Desktop.
Changes needed to activate new scaling & scheduling kafka source consumers code in eventing-kafka-broker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/control-plane/cmd/kafka-controller/main.go b/control-plane/cmd/kafka-controller/main.go | |
index 5447ae63..13b8c84c 100644 | |
--- a/control-plane/cmd/kafka-controller/main.go | |
+++ b/control-plane/cmd/kafka-controller/main.go | |
@@ -29,10 +29,12 @@ import ( | |
"knative.dev/eventing-kafka-broker/control-plane/pkg/config" | |
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" | |
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel" | |
+ "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumer" | |
+ "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup" | |
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/sink" | |
- "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/source" | |
+ sourcev2 "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/source/v2" | |
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger" | |
) | |
const ( | |
@@ -58,7 +60,12 @@ func main() { | |
sourceEnv, err := config.GetEnvConfig("SOURCE") | |
if err != nil { | |
- log.Fatal("cannot process environment variables with prefix SINK", err) | |
+ log.Fatal("cannot process environment variables with prefix SOURCE", err) | |
+ } | |
+ | |
+ consumerEnv, err := config.GetEnvConfig("SOURCE") | |
+ if err != nil { | |
+ log.Fatal("cannot process environment variables with prefix CONSUMER", err) | |
} | |
sharedmain.MainNamed(signals.NewContext(), component, | |
@@ -75,7 +82,7 @@ func main() { | |
injection.NamedControllerConstructor{ | |
Name: "trigger-controller", | |
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl { | |
return trigger.NewController(ctx, watcher, brokerEnv) | |
}, | |
}, | |
@@ -83,7 +90,7 @@ func main() { | |
injection.NamedControllerConstructor{ | |
Name: "channel-controller", | |
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl { | |
return channel.NewController(ctx, watcher, channelEnv) | |
}, | |
}, | |
@@ -99,7 +106,23 @@ func main() { | |
injection.NamedControllerConstructor{ | |
Name: "source-controller", | |
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl { | |
- return source.NewController(ctx, watcher, sourceEnv) | |
+ return sourcev2.NewController(ctx, sourceEnv) | |
+ }, | |
+ }, | |
+ | |
+ // ConsumerGroup controller | |
+ injection.NamedControllerConstructor{ | |
+ Name: "consumergroup-controller", | |
+ ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl { | |
+ return consumergroup.NewController(ctx) | |
+ }, | |
+ }, | |
+ | |
+ // Consumer controller | |
+ injection.NamedControllerConstructor{ | |
+ Name: "consumer-controller", | |
+ ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl { | |
+ return consumer.NewController(ctx, consumerEnv) | |
}, | |
}, | |
) | |
diff --git a/data-plane/config/source/500-dispatcher.yaml b/data-plane/config/source/500-dispatcher.yaml | |
index 3be219bf..9866ab86 100644 | |
--- a/data-plane/config/source/500-dispatcher.yaml | |
+++ b/data-plane/config/source/500-dispatcher.yaml | |
@@ -15,7 +15,7 @@ | |
# limitations under the License. | |
apiVersion: apps/v1 | |
-kind: Deployment | |
+kind: StatefulSet | |
metadata: | |
name: kafka-source-dispatcher | |
namespace: knative-eventing | |
@@ -23,6 +23,8 @@ metadata: | |
app: kafka-source-dispatcher | |
kafka.eventing.knative.dev/release: devel | |
spec: | |
+ serviceName: kafka-source-dispatcher | |
+ podManagementPolicy: "Parallel" | |
selector: | |
matchLabels: | |
app: kafka-source-dispatcher | |
@@ -31,6 +33,7 @@ spec: | |
name: kafka-source-dispatcher | |
labels: | |
app: kafka-source-dispatcher | |
+ app.kubernetes.io/component: kafka-dispatcher | |
kafka.eventing.knative.dev/release: devel | |
spec: | |
serviceAccountName: knative-kafka-source-data-plane | |
@@ -44,8 +47,8 @@ spec: | |
- mountPath: /etc/config | |
name: config-kafka-source-data-plane | |
readOnly: true | |
- - mountPath: /etc/sources | |
- name: kafka-source-sources | |
+ - mountPath: /etc/contract-resources | |
+ name: contract-resources | |
readOnly: true | |
- mountPath: /tmp | |
name: cache | |
@@ -73,7 +76,7 @@ spec: | |
- name: WEBCLIENT_CONFIG_FILE_PATH | |
value: /etc/config/config-kafka-source-webclient.properties | |
- name: DATA_PLANE_CONFIG_FILE_PATH | |
- value: /etc/sources/data | |
+ value: /etc/contract-resources/data | |
- name: EGRESSES_INITIAL_CAPACITY | |
value: "20" | |
- name: INSTANCE_ID | |
@@ -102,7 +105,16 @@ spec: | |
- "-Dlogback.configurationFile=/etc/logging/config.xml" | |
- "-jar" | |
- "/app/app.jar" | |
- # TODO set resources (limits and requests) | |
+ | |
+ resources: | |
+ requests: | |
+ cpu: 1000m | |
+ # 600Mi for virtual replicas + 100Mi overhead | |
+ memory: 700Mi | |
+ limits: | |
+ cpu: 2000m | |
+ memory: 1000Mi | |
+ | |
livenessProbe: | |
failureThreshold: 3 | |
httpGet: | |
@@ -133,9 +145,6 @@ spec: | |
- name: config-kafka-source-data-plane | |
configMap: | |
name: config-kafka-source-data-plane | |
- - name: kafka-source-sources | |
- configMap: | |
- name: kafka-source-sources | |
- name: cache | |
emptyDir: { } | |
- name: kafka-config-logging |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
cc: @aslom @lionelvillard for activating new scaling support in ekb