Skip to content

Instantly share code, notes, and snippets.

@JonCanning
Last active January 24, 2023 11:02
Show Gist options
  • Save JonCanning/919ee0306e4e821838739d766222fde1 to your computer and use it in GitHub Desktop.
Save JonCanning/919ee0306e4e821838739d766222fde1 to your computer and use it in GitHub Desktop.
Kafka F# example
#r "nuget: Confluent.Kafka"
#r "nuget: System.Text.Json"
open Confluent.Kafka
open System
open System.Text.Json
open System.Threading.Tasks
open Confluent.Kafka.Admin
let server = "localhost:9092"
let topicName = "topic"
let producerConfig = ProducerConfig(BootstrapServers = server)
try
AdminClientBuilder(AdminClientConfig producerConfig)
.Build()
.CreateTopicsAsync([| TopicSpecification(Name = topicName) |])
.Wait()
with _ -> ()
let producer = ProducerBuilder<byte array, byte array>(producerConfig).Build()
let consumer =
ConsumerBuilder<byte array, byte array>(
ConsumerConfig(BootstrapServers = server, GroupId = Guid.NewGuid().ToString())
)
.Build()
consumer.Subscribe topicName
Task.Run(fun () ->
while true do
try
let result = consumer.Consume()
if result <> null && result.Message <> null then
let message = JsonSerializer.Deserialize result.Message.Value
printfn $"""MESSAGE %A{message}"""
with ex ->
eprintfn $"EXCEPTION {ex.Message}")
let send (request: byte array) =
producer.Produce(topicName, Message(Value = request))
printfn "Press any key to send"
Console.ReadKey()
JsonSerializer.SerializeToUtf8Bytes {| Now = DateTimeOffset.UtcNow |} |> send
producer.Flush()
printfn "Press any key to exit"
Console.ReadKey()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment