Skip to content

Instantly share code, notes, and snippets.

@TsuyoshiUshio
Created December 4, 2020 08:36
Show Gist options
  • Save TsuyoshiUshio/30049c785f5d14e267be7989e5b78598 to your computer and use it in GitHub Desktop.
Save TsuyoshiUshio/30049c785f5d14e267be7989e5b78598 to your computer and use it in GitHub Desktop.
Kafka Extension Example
{
"version": "2.0",
"logging": {
"logLevel": {
"Function.MyFunction": "Information",
"default": "Information"
}
},
"extensions": {
"kafka": {
"maxBatchSize": 2,
"AutoCommitIntervalMs": 200,
"SubscriberIntervalInSeconds": 3,
"ExecutorChannelCapacity": 1,
"ChannelFullRetryIntervalInMs": 50,
"MaxPollIntervalMs": 300000,
"LibkafkaDebug": "fetch"
}
}
}
using Avro;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
namespace Kafka300
{
public class SimpleKafkaTriggers
{
[FunctionName(nameof(SampleConsumerAsync))]
public async Task SampleConsumerAsync(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string> kafkaEvent,
ILogger logger)
{
await Task.Delay(TimeSpan.FromSeconds(2));
logger.LogInformation($"SampleConsumerAsync partition: {kafkaEvent.Partition} : read 1 message");
}
[FunctionName(nameof(SampleConsumerAsync1))]
public async Task SampleConsumerAsync1(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string>[] kafkaEvents,
ILogger logger)
{
await Task.Delay(TimeSpan.FromSeconds(2));
logger.LogInformation($"Count: {kafkaEvents.Length}");
foreach (var kafkaEvent in kafkaEvents)
{
// logger.LogInformation($"partition: {kafkaEvent.Partition} :{kafkaEvent.Value.ToString()}");
logger.LogInformation($"SampleConsumerAsync partition: {kafkaEvent.Partition} : read 1 message");
}
}
[FunctionName(nameof(SampleConsumerAsync2))]
public async Task SampleConsumerAsync2(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string, string>[] kafkaEvents,
ILogger logger)
{
await Task.Delay(TimeSpan.FromSeconds(2));
logger.LogInformation($"Count: {kafkaEvents.Length}");
foreach (var kafkaEvent in kafkaEvents)
{
logger.LogInformation($"SampleConsumerAsync2 partition: {kafkaEvent.Partition} :{kafkaEvent.Key} : : read 1 message"); // You can call kafkaEvent.Value
}
}
[FunctionName(nameof(SampleConsumerAsync3))]
public async Task SampleConsumerAsync3(
[KafkaTrigger(
"BrokerList",
"reproduce2",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
ConsumerGroup = "$Default",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string>[] kafkaEvents,
ILogger logger)
{
await Task.Delay(TimeSpan.FromSeconds(2));
logger.LogInformation($"Count: {kafkaEvents.Length}");
foreach (var kafkaEvent in kafkaEvents)
{
logger.LogInformation($"SampleConsumerAsync3 partition: {kafkaEvent.Partition} : read 1 message");
}
}
}
}
@Rajan-Rajangam
Copy link

Hi Tsuyoshi Ushio,

Could you please provide example about how to retrieve username and password from configuration? I have tried using host.json and local.settings.json at local machine. I could not get it work. An example would be great.

Also, can I take configure borkerList in the config as well. The brokerList will be different for dev, acceptance and Prod.

Thanks,
Rajan

@VinnyBonner
Copy link

VinnyBonner commented Jun 27, 2022

Hi Tsuyoshi Ushio,

Could you please provide example about how to retrieve username and password from configuration? I have tried using host.json and local.settings.json at local machine. I could not get it work. An example would be great.

Also, can I take configure borkerList in the config as well. The brokerList will be different for dev, acceptance and Prod.

Thanks, Rajan

Hey Rajan,

For the retrieving the username, password and BrokerList from the local.settings.json you would need to set it up like this:

public async Task Run(
            [KafkaTrigger("BrokerList",
                          "KafkaTriggerTopic",
                          Username = "KafkaUser",
                          Password = "KafkaPassword",
                          Protocol = BrokerProtocol.SaslSsl,                          
                          AuthenticationMode = BrokerAuthenticationMode.Plain,
                          ConsumerGroup = "$Default")] KafkaEventData<string>[] events,
            ILogger log)

Then in your local.settings.json you would have it like this:

{
    "IsEncrypted": false,
    "Values": {
      "AzureWebJobsStorage": "UseDevelopmentStorage=true",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "KafkaUser": "<CLUSTER_API_KEY>",
      "BrokerList": "<BROKER_LIST>",
      "KafkaPassword": "<CLUSTER_API_SECRET >"
  }
}

When published to Azure, you would need to add those same key/value settings pairs on the Azure portal -> Function App -> Configuration -> App Settings.

Vinny

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment