This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
remote function onSubscriptionIntentVerified(readonly & websubhub:VerifiedSubscription subscription) returns error? { | |
if !validNotificationSenderExists(subscription.hubTopic) { | |
task:JobId notificationService = check startNotificationSender(subscription.hubTopic); | |
lock { | |
notificationSenders[subscription.hubTopic] = notificationService; | |
} | |
} | |
string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`; | |
lock { | |
newsReceiversCache[newsReceiverId] = subscription; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// `notificationsSenders` will be used to identify already started notification senders | |
// location name will be the key for `notificationSenders` | |
isolated map<task:JobId> notificationSenders = {}; | |
isolated map<websubhub:VerifiedSubscription> newsReceiversCache = {}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function createMessageConsumer(websubhub:VerifiedSubscription message) returns kafka:Consumer|error { | |
string groupName = string `${message.hubTopic}_${message.hubCallback}`; | |
kafka:ConsumerConfiguration consumerConfiguration = { | |
groupId: groupName, | |
topics: [message.hubTopic], | |
// turn-off the auto offset commit | |
autoCommit: false | |
}; | |
return check new ("localhost:9092", consumerConfiguration); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type UpdateMessageConsumerRecord record {| | |
*kafka:AnydataConsumerRecord; | |
weatherApi:WeatherReport value; | |
|}; | |
function startNotificationReceiver(websubhub:VerifiedSubscription newsReceiver) returns error? { | |
kafka:Consumer kafkaConsumer = check createMessageConsumer(newsReceiver); | |
websubhub:HubClient hubClient = check new (newsReceiver, { | |
retryConfig: { | |
interval: config:CLIENT_RETRY_INTERVAL, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka:ProducerConfiguration messagePersistConfig = { | |
clientId: "message-persist-client", | |
acks: "1", | |
retryCount: 3 | |
}; | |
final kafka:Producer messagePersistProducer = check new ("localhost:9092", messagePersistConfig); | |
public isolated function publishWeatherNotification(string location, WeatherReport weatherReport) returns error? { | |
json payload = weatherReport.toJson(); | |
byte[] serializedContent = payload.toJsonString().toBytes(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
isolated class NotificationSender { | |
*task:Job; | |
private final string location; | |
isolated function init(string location) { | |
self.location = location; | |
} | |
public isolated function execute() { | |
WeatherReport|error weatherReport = getWeatherReport(self.location); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
WeatherReport weatherReport = getWeatherReport(location); | |
if weatherReport is error { | |
log:printWarn(string `Error occurred while retrieving weather-report: ${weatherReport.message()}`, stackTrace = weatherReport.stackTrace()); | |
continue; | |
} | |
foreach var [newsReceiverId, clientEp] in newsDispatchClients.entries() { | |
websubhub:ContentDistributionSuccess|error response = clientEp->notifyContentDistribution({ | |
contentType: mime:APPLICATION_JSON, | |
content: { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
http:Client openWeatherClient = check new("https://api.openweathermap.org/data/2.5"); | |
isolated function getWeatherReport(string location) returns WeatherReport|error { | |
return openWeatherClient->get(string`/weather?q=${location}&appid=xxxxxx`); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type WeatherItem record { | |
int id; | |
string main; | |
string description; | |
string icon; | |
}; | |
type Main record { | |
decimal temp; | |
decimal feels_like; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
curl https://api.openweathermap.org/data/2.5/weather?q=Colombo&appid={API_KEY} | |
{ | |
"coord":{ | |
"lon":79.8478, | |
"lat":6.9319 | |
}, | |
"weather":[ | |
{ | |
"id":804, |
NewerOlder