Last active
December 20, 2022 19:43
-
-
Save fernandocamargoai/91fc3813407c7862f2962b459f523aa5 to your computer and use it in GitHub Desktop.
DataDog Issue (2)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
// Must come before importing any instrumented module. | |
import tracer from 'dd-trace'; | |
tracer.init(); | |
import { PubSub } from '@google-cloud/pubsub' | |
import { Storage } from "@google-cloud/storage" | |
import { status as gRPCStatus } from '@grpc/grpc-js' | |
class QueueProcessor { | |
constructor({projectId, topicNameOrId, subscriptionName}) { | |
this.subscriptionName = subscriptionName | |
this.topic = new PubSub({projectId}).topic(topicNameOrId) | |
this.bucket = new Storage(this.storageOptions()).bucket(storageBucketName) | |
this.pageFetcher = new PageFetcher(this.bucket) | |
} | |
async run() { | |
try { | |
await this.topic.createSubscription(this.subscriptionName) | |
} catch (error) { | |
if (error.code == gRPCStatus.ALREADY_EXISTS || error.code == gRPCStatus.PERMISSION_DENIED) { | |
this.logger.debug("failed to create pubsub subscription, assuming it exists as subscribable") | |
} else { | |
this.logger.error("failed to create subscription", { 'error': error.toString() }) | |
throw error | |
} | |
} | |
await this.pageFetcher.initialize() | |
this.subscription.on('message', this.processPubSubMessage.bind(this)) | |
this.subscription.on('error', this.processPubSubError.bind(this)) | |
} | |
async processPubSubError(err) { | |
this.logger.error("received error from pubsub topic", { error: err }) | |
this.logger.info("terminating") | |
await this.subscription.close() | |
await this.pageFetcher.close() | |
} | |
async processPubSubMessage(message) { | |
await tracer.trace("QueueProcessor.processPubSubMessage", {resource: "QueueProcessor.processPubSubMessage"}, async (span) => { | |
let payload | |
span.setTag("messageId", message.id) | |
try { | |
payload = JSON.parse(message.data) | |
console.log(`received request, url:${payload.url}`) | |
span.setTag("pageUrl", payload.url) | |
span.setTag("pageDomain", new URL(payload.url).hostname) | |
// Page Fetching specific stuff | |
if (response.ok()) { | |
// Other stuff | |
message.ack() | |
span.setTag("ackMessage", true) | |
} else { | |
message.nack() | |
span.setTag("ackMessage", false) | |
} | |
} catch(e) { | |
span.addTags({ | |
'error.type': e.name, | |
'error.msg': e.message, | |
'error.stack': e.stack | |
}) | |
if (e instanceof SupperssableError) { | |
console.log(`got a suppressable error while fetching the page: ${e.toString()}\n\tpayload: ${payload}, won't retry`) | |
message.ack() | |
span.setTag("ackMessage", true) | |
} else { | |
console.warn(`failed to fetch page: ${e}\n\tpayload: ${payload}, will retry`) | |
message.nack() | |
span.setTag("ackMessage", false) | |
} | |
} finally { | |
// span.finish() // Tried with and without it | |
} | |
}) | |
} | |
} | |
const processor = new QueueProcessor({ | |
projectId: values.google_cloud_project, | |
topicNameOrId: values.pubsub_topic, | |
subscriptionName: values.pubsub_subscription, | |
}) | |
await processor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment