Skip to content

Instantly share code, notes, and snippets.

@ayeshLK
Created March 12, 2023 08:07
Show Gist options
  • Save ayeshLK/12fd4a25bfc9b7c6b88aa138620b7aaf to your computer and use it in GitHub Desktop.
Save ayeshLK/12fd4a25bfc9b7c6b88aa138620b7aaf to your computer and use it in GitHub Desktop.
type UpdateMessageConsumerRecord record {|
*kafka:AnydataConsumerRecord;
weatherApi:WeatherReport value;
|};
function startNotificationReceiver(websubhub:VerifiedSubscription newsReceiver) returns error? {
kafka:Consumer kafkaConsumer = check createMessageConsumer(newsReceiver);
websubhub:HubClient hubClient = check new (newsReceiver, {
retryConfig: {
interval: config:CLIENT_RETRY_INTERVAL,
count: config:CLIENT_RETRY_COUNT,
backOffFactor: 2.0,
maxWaitInterval: 20
},
timeout: config:CLIENT_TIMEOUT
});
// start the async function for `News Receiver`
_ = start pollForNewUpdates(hubClient, kafkaConsumer, newsReceiver);
}
isolated function pollForNewUpdates(websubhub:HubClient hubClient, kafka:Consumer kafkaConsumer, websubhub:VerifiedSubscription newsReceiver) returns error? {
string location = newsReceiver.hubTopic;
string receiverId = string `${newsReceiver.hubTopic}-${newsReceiver.hubCallback}`;
do {
while true {
UpdateMessageConsumerRecord[] records = check kafkaConsumer->poll(10.0);
if !isValidNewsReceiver(location, receiverId) {
fail error(string `Subscriber with Id ${receiverId} or topic ${location} is invalid`);
}
var result = notifySubscribers(records, hubClient, kafkaConsumer);
if result is error {
log:printError("Error occurred while sending notification to subscriber ", err = result.message());
check result;
} else {
// commit the Kafka offset only if the message delivery is successfull
check kafkaConsumer->'commit();
}
}
} on fail var e {
log:printError(string `Error occurred while sending notification to news-receiver: ${e.message()}`, stackTrace = e.stackTrace());
removeNewsReceiver(receiverId);
kafka:Error? result = kafkaConsumer->close(15.0);
if result is kafka:Error {
log:printError("Error occurred while gracefully closing kafka-consumer", err = result.message());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment