Last active
December 16, 2022 17:38
-
-
Save fernandocamargoai/6dc4af45c787a7d4096fd7ef286b3a93 to your computer and use it in GitHub Desktop.
DataDog issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
// Must come before importing any instrumented module. | |
import tracer from 'dd-trace'; | |
tracer.init(); | |
import { PubSub } from '@google-cloud/pubsub' | |
import { Storage } from "@google-cloud/storage" | |
import { status as gRPCStatus } from '@grpc/grpc-js' | |
class QueueProcessor { | |
constructor({projectId, topicNameOrId, subscriptionName}) { | |
this.subscriptionName = subscriptionName | |
this.topic = new PubSub({projectId}).topic(topicNameOrId) | |
this.bucket = new Storage(this.storageOptions()).bucket(storageBucketName) | |
this.pageFetcher = new PageFetcher(this.bucket) | |
} | |
async run() { | |
try { | |
await this.topic.createSubscription(this.subscriptionName) | |
} catch (error) { | |
if (error.code == gRPCStatus.ALREADY_EXISTS || error.code == gRPCStatus.PERMISSION_DENIED) { | |
this.logger.debug("failed to create pubsub subscription, assuming it exists as subscribable") | |
} else { | |
this.logger.error("failed to create subscription", { 'error': error.toString() }) | |
throw error | |
} | |
} | |
await this.pageFetcher.initialize() | |
this.subscription.on('message', this.processPubSubMessage.bind(this)) | |
this.subscription.on('error', this.processPubSubError.bind(this)) | |
} | |
async processPubSubError(err) { | |
this.logger.error("received error from pubsub topic", { error: err }) | |
this.logger.info("terminating") | |
await this.subscription.close() | |
await this.pageFetcher.close() | |
} | |
async processPubSubMessage(message) { | |
let span = tracer.startSpan("QueueProcessor.processPubSubMessage", {childOf: tracer.scope().active()}) | |
let payload | |
span.setTag("messageId", message.id) | |
try { | |
payload = JSON.parse(message.data) | |
console.log(`received request, url:${payload.url}`) | |
span.setTag("pageUrl", payload.url) | |
span.setTag("pageDomain", new URL(payload.url).hostname) | |
// Page Fetching specific stuff | |
if (response.ok()) { | |
// Other stuff | |
message.ack() | |
span.setTag("ackMessage", true) | |
} else { | |
message.nack() | |
span.setTag("ackMessage", false) | |
} | |
} catch(e) { | |
span.addTags({ | |
'error.type': e.name, | |
'error.msg': e.message, | |
'error.stack': e.stack | |
}) | |
if (e instanceof SupperssableError) { | |
console.log(`got a suppressable error while fetching the page: ${e.toString()}\n\tpayload: ${payload}, won't retry`) | |
message.ack() | |
span.setTag("ackMessage", true) | |
} else { | |
console.warn(`failed to fetch page: ${e}\n\tpayload: ${payload}, will retry`) | |
message.nack() | |
span.setTag("ackMessage", false) | |
} | |
} finally { | |
span.finish() | |
} | |
} | |
} | |
const processor = new QueueProcessor({ | |
projectId: values.google_cloud_project, | |
topicNameOrId: values.pubsub_topic, | |
subscriptionName: values.pubsub_subscription, | |
}) | |
await processor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment