Skip to content

Instantly share code, notes, and snippets.

@j05u3
Last active October 2, 2023 16:38
Show Gist options
  • Save j05u3/b3ad1d5d9106a918941587e03c1919b1 to your computer and use it in GitHub Desktop.
Save j05u3/b3ad1d5d9106a918941587e03c1919b1 to your computer and use it in GitHub Desktop.
Firestore storage functions for the whatsapp-cloud-api-express models to be able to display the messages in frontends like chats_manager
import { storeIncomingMessage, updateOutgoingMessageStatus, storeOutgoingMessage } from "./firestore-data-access/message-storage";
import { process_message_for_end_users_bot } from "./tasks/end-users-bot/process_message_for_end_users_bot"; // your own processing function
import { sleep } from "./util/GeneralUtils";
import { Message, createMessageSender, getWebhookRouter } from "whatsapp-cloud-api-express";
import { Status } from "whatsapp-cloud-api-express/lib/createBot.types";
// do task with exponential backoff
export async function doWithRetry<T>(task: () => Promise<T>, retries = 4, backoff = 1000): Promise<T> {
try {
return await task();
} catch (err) {
if (retries === 0) {
throw err;
}
console.log(`Retrying task with ${retries} retries left and backoff ${backoff} ms`)
await new Promise(resolve => setTimeout(resolve, backoff));
return await doWithRetry(task, retries - 1, backoff * 2);
}
}
// this function is to get the object that allows to send messages
export function get_end_users_bot() {
return createMessageSender(
process.env.END_USER_NUMBER_ID ?? "",
process.env.ACCESS_TOKEN ?? "",
async (msg) => {
console.log(JSON.stringify(msg));
await storeOutgoingMessage(msg);
});
}
export const onNewMessage = async (msg: Message) => {
await storeIncomingMessage(msg);
try {
await process_message_for_end_users_bot(msg);
} catch (e) {
console.error(e);
await get_end_users_bot().sendText(process.env.DEV_NUMBER ?? "", 'Error 🙏: ' + (e as any).message);
}
};
export const onStatusChange = async (status: Status) => {
// just waiting some time to make sure the outgoing message is stored in firestore
await sleep(400);
try {
console.log("updating msg status for: " + status.id + " to: " + status.status + "");
await doWithRetry(() => updateOutgoingMessageStatus(status));
} catch (e) {
console.error(e);
}
};
const app = express(); // or you can reuse your existing instance of express
app.use("/webhooks/whatsapp", getWebhookRouter({
webhookVerifyToken: process.env.WHATSAPP_WEBHOOK_VERIFICATION_TOKEN ?? "",
onNewMessage,
onStatusChange,
logAllEntrantRequests: true,
}));
// in case you are using Google Cloud Functions:
export const my_google_cloud_endpoints_function_01 = functions.runWith({
timeoutSeconds: 2 * 60,
minInstances: 1,
}).https.onRequest(app);
import fetch from "node-fetch";
import { Readable } from "node:stream";
import { extension } from "mime-types";
import requestPromise from "request-promise";
import { googleStorageStringOrStreamUpload } from "./storage-stream-upload";
import { PUBLIC_STORAGE_BUCKET } from "../constants/storage";
interface MediaResponse {
url: string;
mime_type: string;
sha256: string;
file_size: number;
id: string;
messaging_product: string;
}
export async function downloadReceivedWhatsappMedia(mediaId: string, mime_type: string) {
const mediaUrlResponse = (await requestPromise({
method: 'GET',
url: "https://graph.facebook.com/v15.0/" + mediaId,
headers: {
"Authorization": "Bearer " + (process.env.ACCESS_TOKEN ?? ""),
},
json: true // Automatically parses the response to JSON
})) as MediaResponse;
// example: {"url":"https://lookaside.fbsbx.com/whatsapp_business/attachments/?mid=540555951411087&ext=1668014178&hash=ATsk4Mw2Y4MC52PTDrwuLfjjuc12y-Nil3_IjHVhVWb9og","mime_type":"image/jpeg","sha256":"aa705444551eb37f774de430c61863f2c5c5b44e32375b54e483725a78dccf5d","file_size":157673,"id":"540555951411087","messaging_product":"whatsapp"}
console.log(mediaUrlResponse.url);
const mediaResponse = await fetch(mediaUrlResponse.url, {
headers: {
"Authorization": "Bearer " + (process.env.ACCESS_TOKEN ?? ""),
},
});
const stream = Readable.from(mediaResponse.body);
// Note: next lines didn't work, buffer was somehow corrupted
// const buffer = await ((await requestPromise({
// method: 'GET',
// url: mediaResponse.url,
// headers: {
// "Authorization": "Bearer " + (process.env.ACCESS_TOKEN ?? ""),
// },
// encoding: null, // to get the raw buffer, otherwise it will be a string (which corrupts data in case of image files)
// })) as Promise<Buffer>);
// // write buffer to file
// writeFileSync('test.jpeg', buffer);
// return;
const ext = extension(mime_type) || "unknown";
const mediaLink = await googleStorageStringOrStreamUpload(
PUBLIC_STORAGE_BUCKET,
"ws_rec_media/" + mediaId + "." + ext,
stream);
console.log(mediaLink);
return mediaLink;
}
import { Message } from "whatsapp-cloud-api-express";
import { fdb } from "../firestore-init";
import { downloadReceivedWhatsappMedia } from "../tasks/download-received-image";
import { SendMessageResult } from "whatsapp-cloud-api-express/lib/sendRequestHelper";
export const FS_incoming_messages = "incoming_messages";
export const FS_outgoing_messages = "outgoing_messages";
export const FS_user_message_status = "user_message_status";
export const FS_user_messaging_profile = "user_messaging_profile";
type OutgoingMessage = {
fromPhoneNumberId: string;
requestBody: any;
responseSummary: SendMessageResult;
};
export interface UserMessagingProfile {
name: string;
}
export interface UserMessageStatus {
last_message_timestamp: number;
// outgoing messages
outgoing_messages_count?: number;
last_outgoing_message_timestamp?: number;
last_outgoing_message_doc_id?: string;
last_outgoing_message_ws_msg_id?: string;
// incoming messages
incoming_messages_count?: number;
last_incoming_message_timestamp?: number;
last_incoming_message_doc_id?: string;
last_incoming_message_ws_msg_id?: string;
}
function getUserStatusGenericUpdateFields(now: number) {
return {
last_message_timestamp: now,
};
}
export async function storeOutgoingMessage(obj: OutgoingMessage) {
const new_doc_ref = fdb.collection(FS_outgoing_messages).doc();
const user_phone_number = obj.responseSummary.phoneNumber;
const user_status_doc_ref = fdb.collection(FS_user_message_status).doc(user_phone_number);
const now = Date.now();
await fdb.runTransaction(async (t) => {
const doc = await t.get(user_status_doc_ref);
const doc_data = doc.data() as UserMessageStatus | undefined;
const upd_obj: Partial<UserMessageStatus> = {
...getUserStatusGenericUpdateFields(now),
outgoing_messages_count: (doc_data?.outgoing_messages_count ?? 0) + 1,
last_outgoing_message_doc_id: new_doc_ref.id,
last_outgoing_message_timestamp: now,
last_outgoing_message_ws_msg_id: obj.responseSummary.messageId,
};
t.set(new_doc_ref, {
...obj,
t: now,
});
t.set(user_status_doc_ref, upd_obj, { merge: true });
});
return new_doc_ref.id;
}
export interface StatusError {
code: number;
title: string | undefined;
error_data: {
details: string | undefined;
} | undefined;
}
// https://developers.facebook.com/docs/whatsapp/cloud-api/webhooks/components#statuses-object
export interface StatusReceived {
timestamp: string; // in seconds
status: string; // 'failed', 'delivered', 'read'
recipient_id: string; // phone number
id: string; // id of the sent message which this status is for
errors: StatusError[] | undefined; // array of errors
// TODO: add the rest of the fields
}
export async function updateOutgoingMessageStatus(status: StatusReceived) {
const message_id = status.id;
const type = status.status;
const query = fdb.collection(FS_outgoing_messages)
.where("responseSummary.messageId", "==", message_id)
.limit(1);
const docs = await query.get();
if (docs.empty) {
throw new Error(`No outgoing message found for message_id: ${message_id}`);
}
const doc = docs.docs[0]!;
const now = Date.now();
// update the status
await doc.ref.update({
[`lastStatus_${type}`]: {
...status,
updateReceptionTime: now,
},
});
}
export async function storeIncomingMessage(msg: Message) {
// storing the media if any
const mime_type = msg.data.mime_type as string | undefined;
if (mime_type != null) {
const media_id = msg.data.id as string;
const media_url = await downloadReceivedWhatsappMedia(media_id, mime_type);
msg.data.media_url = media_url; // to be stored on the message (Firestore)
}
// firestore
const new_doc_ref = fdb.collection(FS_incoming_messages).doc();
const user_phone_number = msg.from;
const user_status_doc_ref = fdb.collection(FS_user_message_status).doc(user_phone_number);
const now = Date.now();
// creating profile if doesn't exist
const user_profile_doc = await fdb.collection(FS_user_messaging_profile).doc(user_phone_number).get();
if (!user_profile_doc.exists) {
await user_profile_doc.ref.set({
name: msg.name,
});
}
await fdb.runTransaction(async (t) => {
const doc = await t.get(user_status_doc_ref);
const doc_data = doc.data() as UserMessageStatus | undefined;
const upd_obj: Partial<UserMessageStatus> = {
...getUserStatusGenericUpdateFields(now),
incoming_messages_count: (doc_data?.incoming_messages_count ?? 0) + 1,
last_incoming_message_doc_id: new_doc_ref.id,
last_incoming_message_timestamp: now,
last_incoming_message_ws_msg_id: msg.id,
};
t.set(new_doc_ref, {
...msg,
t: now,
});
t.set(user_status_doc_ref, upd_obj, { merge: true });
});
return {
...msg,
docId: new_doc_ref.id,
};
}
export async function getIncomingMessage(docId: string) {
const doc = await fdb.collection(FS_incoming_messages).doc(docId).get();
return doc.data() as Message | undefined;
}
export async function getOutgoingMessage(docId: string) {
const doc = await fdb.collection(FS_outgoing_messages).doc(docId).get();
return doc.data() as OutgoingMessage | undefined;
}
export async function getUserMessagingProfile(phoneNumber: string) {
const doc = await fdb.collection(FS_user_messaging_profile).doc(phoneNumber).get();
return doc.data() as UserMessagingProfile | undefined;
}
export async function getUserMessageStatus(phoneNumber: string) {
const doc = await fdb.collection(FS_user_message_status).doc(phoneNumber).get();
return doc.data() as UserMessageStatus | undefined;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment