Skip to content

Instantly share code, notes, and snippets.

@fernandocamargoai
Last active December 20, 2022 19:43
Show Gist options
  • Save fernandocamargoai/91fc3813407c7862f2962b459f523aa5 to your computer and use it in GitHub Desktop.
Save fernandocamargoai/91fc3813407c7862f2962b459f523aa5 to your computer and use it in GitHub Desktop.
DataDog Issue (2)
'use strict'
// Must come before importing any instrumented module.
import tracer from 'dd-trace';
tracer.init();
import { PubSub } from '@google-cloud/pubsub'
import { Storage } from "@google-cloud/storage"
import { status as gRPCStatus } from '@grpc/grpc-js'
class QueueProcessor {
constructor({projectId, topicNameOrId, subscriptionName}) {
this.subscriptionName = subscriptionName
this.topic = new PubSub({projectId}).topic(topicNameOrId)
this.bucket = new Storage(this.storageOptions()).bucket(storageBucketName)
this.pageFetcher = new PageFetcher(this.bucket)
}
async run() {
try {
await this.topic.createSubscription(this.subscriptionName)
} catch (error) {
if (error.code == gRPCStatus.ALREADY_EXISTS || error.code == gRPCStatus.PERMISSION_DENIED) {
this.logger.debug("failed to create pubsub subscription, assuming it exists as subscribable")
} else {
this.logger.error("failed to create subscription", { 'error': error.toString() })
throw error
}
}
await this.pageFetcher.initialize()
this.subscription.on('message', this.processPubSubMessage.bind(this))
this.subscription.on('error', this.processPubSubError.bind(this))
}
async processPubSubError(err) {
this.logger.error("received error from pubsub topic", { error: err })
this.logger.info("terminating")
await this.subscription.close()
await this.pageFetcher.close()
}
async processPubSubMessage(message) {
await tracer.trace("QueueProcessor.processPubSubMessage", {resource: "QueueProcessor.processPubSubMessage"}, async (span) => {
let payload
span.setTag("messageId", message.id)
try {
payload = JSON.parse(message.data)
console.log(`received request, url:${payload.url}`)
span.setTag("pageUrl", payload.url)
span.setTag("pageDomain", new URL(payload.url).hostname)
// Page Fetching specific stuff
if (response.ok()) {
// Other stuff
message.ack()
span.setTag("ackMessage", true)
} else {
message.nack()
span.setTag("ackMessage", false)
}
} catch(e) {
span.addTags({
'error.type': e.name,
'error.msg': e.message,
'error.stack': e.stack
})
if (e instanceof SupperssableError) {
console.log(`got a suppressable error while fetching the page: ${e.toString()}\n\tpayload: ${payload}, won't retry`)
message.ack()
span.setTag("ackMessage", true)
} else {
console.warn(`failed to fetch page: ${e}\n\tpayload: ${payload}, will retry`)
message.nack()
span.setTag("ackMessage", false)
}
} finally {
// span.finish() // Tried with and without it
}
})
}
}
const processor = new QueueProcessor({
projectId: values.google_cloud_project,
topicNameOrId: values.pubsub_topic,
subscriptionName: values.pubsub_subscription,
})
await processor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment