Skip to content

Instantly share code, notes, and snippets.

@ruxo
Last active July 1, 2023 22:45
Show Gist options
  • Save ruxo/1af40807dd4e79b2c0b204f96874b290 to your computer and use it in GitHub Desktop.
Save ruxo/1af40807dd4e79b2c0b204f96874b290 to your computer and use it in GitHub Desktop.
#nowarn "760"
#r "nuget:Confluent.Kafka"
open System
open System.Diagnostics
open System.Threading
open Confluent.Kafka
[<Literal>]
let Topic = "test-topic"
let config = ConsumerConfig(BootstrapServers = "localhost:9092",
GroupId = "test-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
SocketTimeoutMs = 5000
)
let consumer = ConsumerBuilder<string,string>(config).Build()
consumer.Subscribe Topic
let cancel = CancellationTokenSource()
Console.CancelKeyPress.AddHandler(fun _ e ->
cancel.Cancel()
e.Cancel <- true
)
printfn "Consumer started!"
try
while true do
let m = consumer.Consume(cancel.Token)
let message = m.Message
let timestamp = message.Timestamp.UtcDateTime.ToLocalTime()
printfn $"{timestamp} Received {m.Partition}/{m.Offset}: %s{message.Key} -> %s{message.Value}"
with
| :? OperationCanceledException -> ()
| e -> printfn $"Error! {e}"
printfn "Consumer quited!"
let watch = Stopwatch.StartNew()
consumer.Close()
printfn $"Consumer closed in {watch.ElapsedMilliseconds}ms!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment