Skip to content

Instantly share code, notes, and snippets.

@troyatomic
Created September 22, 2022 16:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save troyatomic/43e64c30db7cb59555f9056b74005217 to your computer and use it in GitHub Desktop.
Save troyatomic/43e64c30db7cb59555f9056b74005217 to your computer and use it in GitHub Desktop.
Cloud Function that gets document data, posts to PubSub Topic with a Schema and deletes the document
const { Logging } = require('@google-cloud/logging');
const logging = new Logging();
const log = logging.log('publishToBigQuery');
const { PubSub } = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const protobuf = require('protobufjs');
const Firestore = require('@google-cloud/firestore');
const firestore = new Firestore();
exports.publishToBigQuery = async (event, context) => {
const affectedDoc = firestore.doc(`messages/${context.params.documentId}`);
try {
const documentSnapshot = await affectedDoc.get();
if (documentSnapshot.exists) {
const firestoreData = documentSnapshot.data();
const topic = pubsub.topic('firestore-to-BQ');
const schema = pubsub.schema('message');
const info = await schema.get();
const type = protobuf.parse(info.definition, { keepCase: true });
const ProtocolBuffer = type.root.lookupType('ProtocolBuffer');
const message = ProtocolBuffer.create(firestoreData);
const data = Buffer.from(JSON.stringify(message.toJSON()));
const value = await topic.publishMessage({data});
log.info(log.entry(`Document published on pubsub ${value}`));
const delResult = await affectedDoc.delete();
log.info(log.entry(`Document deleted. ${delResult}`));
} else {
log.info(log.entry(`Document doesn't exist. ${JSON.stringify(affectedDoc)}`));
}
} catch (error) {
log.error(log.entry(`Error when fetching document: ${error}`));
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment