Created
March 12, 2023 08:07
-
-
Save ayeshLK/12fd4a25bfc9b7c6b88aa138620b7aaf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type UpdateMessageConsumerRecord record {| | |
*kafka:AnydataConsumerRecord; | |
weatherApi:WeatherReport value; | |
|}; | |
function startNotificationReceiver(websubhub:VerifiedSubscription newsReceiver) returns error? { | |
kafka:Consumer kafkaConsumer = check createMessageConsumer(newsReceiver); | |
websubhub:HubClient hubClient = check new (newsReceiver, { | |
retryConfig: { | |
interval: config:CLIENT_RETRY_INTERVAL, | |
count: config:CLIENT_RETRY_COUNT, | |
backOffFactor: 2.0, | |
maxWaitInterval: 20 | |
}, | |
timeout: config:CLIENT_TIMEOUT | |
}); | |
// start the async function for `News Receiver` | |
_ = start pollForNewUpdates(hubClient, kafkaConsumer, newsReceiver); | |
} | |
isolated function pollForNewUpdates(websubhub:HubClient hubClient, kafka:Consumer kafkaConsumer, websubhub:VerifiedSubscription newsReceiver) returns error? { | |
string location = newsReceiver.hubTopic; | |
string receiverId = string `${newsReceiver.hubTopic}-${newsReceiver.hubCallback}`; | |
do { | |
while true { | |
UpdateMessageConsumerRecord[] records = check kafkaConsumer->poll(10.0); | |
if !isValidNewsReceiver(location, receiverId) { | |
fail error(string `Subscriber with Id ${receiverId} or topic ${location} is invalid`); | |
} | |
var result = notifySubscribers(records, hubClient, kafkaConsumer); | |
if result is error { | |
log:printError("Error occurred while sending notification to subscriber ", err = result.message()); | |
check result; | |
} else { | |
// commit the Kafka offset only if the message delivery is successfull | |
check kafkaConsumer->'commit(); | |
} | |
} | |
} on fail var e { | |
log:printError(string `Error occurred while sending notification to news-receiver: ${e.message()}`, stackTrace = e.stackTrace()); | |
removeNewsReceiver(receiverId); | |
kafka:Error? result = kafkaConsumer->close(15.0); | |
if result is kafka:Error { | |
log:printError("Error occurred while gracefully closing kafka-consumer", err = result.message()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment