Skip to content

Instantly share code, notes, and snippets.

View ayeshLK's full-sized avatar
🏠
Working from home

Ayesh Almeida ayeshLK

🏠
Working from home
View GitHub Profile
remote function onSubscriptionIntentVerified(readonly & websubhub:VerifiedSubscription subscription) returns error? {
if !validNotificationSenderExists(subscription.hubTopic) {
task:JobId notificationService = check startNotificationSender(subscription.hubTopic);
lock {
notificationSenders[subscription.hubTopic] = notificationService;
}
}
string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`;
lock {
newsReceiversCache[newsReceiverId] = subscription;
// `notificationsSenders` will be used to identify already started notification senders
// location name will be the key for `notificationSenders`
isolated map<task:JobId> notificationSenders = {};
isolated map<websubhub:VerifiedSubscription> newsReceiversCache = {};
function createMessageConsumer(websubhub:VerifiedSubscription message) returns kafka:Consumer|error {
string groupName = string `${message.hubTopic}_${message.hubCallback}`;
kafka:ConsumerConfiguration consumerConfiguration = {
groupId: groupName,
topics: [message.hubTopic],
// turn-off the auto offset commit
autoCommit: false
};
return check new ("localhost:9092", consumerConfiguration);
}
type UpdateMessageConsumerRecord record {|
*kafka:AnydataConsumerRecord;
weatherApi:WeatherReport value;
|};
function startNotificationReceiver(websubhub:VerifiedSubscription newsReceiver) returns error? {
kafka:Consumer kafkaConsumer = check createMessageConsumer(newsReceiver);
websubhub:HubClient hubClient = check new (newsReceiver, {
retryConfig: {
interval: config:CLIENT_RETRY_INTERVAL,
kafka:ProducerConfiguration messagePersistConfig = {
clientId: "message-persist-client",
acks: "1",
retryCount: 3
};
final kafka:Producer messagePersistProducer = check new ("localhost:9092", messagePersistConfig);
public isolated function publishWeatherNotification(string location, WeatherReport weatherReport) returns error? {
json payload = weatherReport.toJson();
byte[] serializedContent = payload.toJsonString().toBytes();
isolated class NotificationSender {
*task:Job;
private final string location;
isolated function init(string location) {
self.location = location;
}
public isolated function execute() {
WeatherReport|error weatherReport = getWeatherReport(self.location);
WeatherReport weatherReport = getWeatherReport(location);
if weatherReport is error {
log:printWarn(string `Error occurred while retrieving weather-report: ${weatherReport.message()}`, stackTrace = weatherReport.stackTrace());
continue;
}
foreach var [newsReceiverId, clientEp] in newsDispatchClients.entries() {
websubhub:ContentDistributionSuccess|error response = clientEp->notifyContentDistribution({
contentType: mime:APPLICATION_JSON,
content: {
http:Client openWeatherClient = check new("https://api.openweathermap.org/data/2.5");
isolated function getWeatherReport(string location) returns WeatherReport|error {
return openWeatherClient->get(string`/weather?q=${location}&appid=xxxxxx`);
}
type WeatherItem record {
int id;
string main;
string description;
string icon;
};
type Main record {
decimal temp;
decimal feels_like;
curl https://api.openweathermap.org/data/2.5/weather?q=Colombo&appid={API_KEY}
{
"coord":{
"lon":79.8478,
"lat":6.9319
},
"weather":[
{
"id":804,