Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save fl0wo/2b3cfb2116e8451385226b89936c509c to your computer and use it in GitHub Desktop.
Save fl0wo/2b3cfb2116e8451385226b89936c509c to your computer and use it in GitHub Desktop.
momento-topic-publish-heavy-msg.ts
export const publishToTopic = (args:{
cacheName:string,
topicName:string,
message:string,
secret:string,
endpoint:string
}) => {
const cache = new MomentoFetcher(
args.secret,
args.endpoint
);
return cache.publishTopic(args.cacheName, args.topicName, args.message);
}
export class MomentoFetcher {
private readonly apiKey: string;
private readonly baseurl: string;
constructor(key: string, endpoint: string) {
this.apiKey = key;
if (!endpoint.startsWith('https://')) {
this.baseurl = `https://${endpoint}`;
} else {
this.baseurl = `${endpoint}`;
}
}
async get(cacheName: string, key: string) {
const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiKey}`);
if (resp.status < 300) {
console.log(`successfully retrieved ${key} from cache`)
} else {
console.error(`failed to retrieve ${key} from cache. Message: ${resp.statusText}; Status: ${resp.status} cache: ${cacheName}`);
throw new Error(`failed to retrieve item from cache: ${cacheName}`)
}
return await resp.text();
}
async set(cacheName: string, key: string, value: string, ttl_seconds: number = 30) {
const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiKey}&&ttl_seconds=${ttl_seconds}`, {
method: 'PUT',
body: value
});
if (resp.status < 300) {
console.log(`successfully set ${key} into cache`);
} else {
throw new Error(`failed to set item into cache message: ${resp.statusText} status: ${resp.status} cache: ${cacheName}`);
}
return;
}
async delete(cacheName: string, key: string) {
const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiKey}`, {
method: 'DELETE',
});
if (resp.status < 300) {
console.log(`successfully deleted ${key} from cache`);
} else {
throw new Error(`failed to delete ${key} from cache. Message: ${resp.statusText}; Status: ${resp.status} cache: ${cacheName}`);
}
return resp;
}
async publishTopic(cacheName: string, topicName: string, message: string) {
//{{baseUrl}}/topics/:cacheName/:topicName?token=<API Key>
return await fetch(`${this.baseurl}/topics/${cacheName}/${topicName}?token=${this.apiKey}`, {
method: 'POST',
body: message,
headers: {
'Content-Type': 'application/octet-stream'
}
});
}
}
import {publishToTopic} from "../src/momento/client";
const MOMENTO_CACHE_DB_TRIGGERS_API_KEY=''
const MOMENTO_CACHE_DB_TRIGGERS_CACHE_NAME=''
const MOMENTO_CACHE_DB_TRIGGERS_TOPIC_BASE_URL=''
const MOMENTO_CACHE_DB_TRIGGERS_TOPIC_NAME=''
const msg = {
query_type: 'test',
table_name: 'no-table',
row: {
id: '1',
name: 'John Doe',
veryLongField: 'a'.repeat(5000)
}
};
const txtPayload = JSON.stringify(msg);
const txtPayloadLength = txtPayload.length;
const txtPayloadSizeInBytes = Buffer.byteLength(txtPayload, 'utf8')
console.log('publishing to topic',
{
query: msg?.query_type,
txtPayloadLength,
txtPayloadSizeInBytes
});
publishToTopic(
{
message: JSON.stringify(msg),
topicName: MOMENTO_CACHE_DB_TRIGGERS_TOPIC_NAME,
cacheName: MOMENTO_CACHE_DB_TRIGGERS_CACHE_NAME,
secret: MOMENTO_CACHE_DB_TRIGGERS_API_KEY,
endpoint: MOMENTO_CACHE_DB_TRIGGERS_TOPIC_BASE_URL,
}
).then(() => {
console.log('Published');
})
.catch((e) => {
console.error('Error', e);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment