Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Stream Knative HTTP CloudEvents into Apache Kafka

With Knative eventing, you can use a source, to receive events for a 3rd party system, and have them sent as CloudEvents to a HTTP webserver, like:

apiVersion: sources.knative.dev/v1alpha2
kind: PingSource
metadata:
  name: ping-source
spec:
  schedule: "*/2 * * * *"
  jsonData: '{"message": "Hello world!"}'
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: ping-display

Now, how about pumping those events directly into a Apache Kafka topic?

For that we configure a KafkaSink:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: mytopic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

and have it connected to a source, like:

apiVersion: sources.knative.dev/v1alpha2
kind: ApiServerSource
metadata:
  name: testevents
  namespace: default
spec:
  serviceAccountName: events-sa
  mode: Resource
  resources:
  - apiVersion: v1
    kind: Event
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: KafkaSink
      name: my-kafka-sink

Now, all CloudEvent messages produced by the source, are available in the configured Kafka topic. With the simple console consumer, you can read them like:

kubectl -n kafka run kafka-consumer -ti --image=strimzi/kafka:0.19.0-kafka-2.5.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic mytopic --from-beginning 

and the log will be JSON, like:

{"specversion":"1.0","id":"e9e69eb2-9e27-4668-9d22-d81bad714360","source":"https://10.96.0.1:443","type":"dev.knative.apiserver.resource.update","datacontenttype":"application/json","subject":"/apis/v1/namespaces/default/events/testevents.1633277a55fb96cf","time":"2020-09-09T15:35:08.489816312Z","kind":"Event","name":"testevents.1633277a55fb96cf","namespace":"default","data":{"apiVersion":"v1","count":3,"eventTime":null,"firstTimestamp":"2020-09-09T15:34:59Z","involvedObject":{"apiVersion":"sources.knative.dev/v1beta1","kind":"ApiServerSource","name":"testevents","namespace":"default","resourceVersion":"2643924","uid":"518d8ba8-96ce-4cf5-9765-0819f0b2593c"},"kind":"Event","lastTimestamp":"2020-09-09T15:35:08Z","message":"Deployment \"apiserversource-testevents-518d8ba8-96ce-4cf5-9765-0819f0b2593c\" updated","metadata":{"creationTimestamp":"2020-09-09T15:34:59Z","managedFields":[{"apiVersion":"v1","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{"f:apiVersion":{},"f:kind":{},"f:name":{},"f:namespace":{},"f:resourceVersion":{},"f:uid":{}},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:source":{"f:component":{}},"f:type":{}},"manager":"controller","operation":"Update","time":"2020-09-09T15:35:08Z"}],"name":"testevents.1633277a55fb96cf","namespace":"default","resourceVersion":"2644190","selfLink":"/api/v1/namespaces/default/events/testevents.1633277a55fb96cf","uid":"0bd2cafb-faf7-46fb-9695-fa2c1dfbc0c8"},"reason":"ApiServerSourceDeploymentUpdated","reportingComponent":"","reportingInstance":"","source":{"component":"apiserversource-controller"},"type":"Normal"}}
{"specversion":"1.0","id":"0cab2ffe-7f6f-415e-b88d-779d14d39f82","source":"https://10.96.0.1:443","type":"dev.knative.apiserver.resource.update","datacontenttype":"application/json","subject":"/apis/v1/namespaces/default/events/testevents.1633277a55fb96cf","time":"2020-09-09T15:35:09.099833851Z","kind":"Event","name":"testevents.1633277a55fb96cf","namespace":"default","data":{"apiVersion":"v1","count":4,"eventTime":null,"firstTimestamp":"2020-09-09T15:34:59Z","involvedObject":{"apiVersion":"sources.knative.dev/v1beta1","kind":"ApiServerSource","name":"testevents","namespace":"default","resourceVersion":"2643924","uid":"518d8ba8-96ce-4cf5-9765-0819f0b2593c"},"kind":"Event","lastTimestamp":"2020-09-09T15:35:08Z","message":"Deployment \"apiserversource-testevents-518d8ba8-96ce-4cf5-9765-0819f0b2593c\" updated","metadata":{"creationTimestamp":"2020-09-09T15:34:59Z","managedFields":[{"apiVersion":"v1","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{"f:apiVersion":{},"f:kind":{},"f:name":{},"f:namespace":{},"f:resourceVersion":{},"f:uid":{}},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:source":{"f:component":{}},"f:type":{}},"manager":"controller","operation":"Update","time":"2020-09-09T15:35:08Z"}],"name":"testevents.1633277a55fb96cf","namespace":"default","resourceVersion":"2644193","selfLink":"/api/v1/namespaces/default/events/testevents.1633277a55fb96cf","uid":"0bd2cafb-faf7-46fb-9695-fa2c1dfbc0c8"},"reason":"ApiServerSourceDeploymentUpdated","reportingComponent":"","reportingInstance":"","source":{"component":"apiserversource-controller"},"type":"Normal"}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment