Created
December 20, 2022 23:38
-
-
Save fernandocamargoai/2b9ece13b9ceae979d295f8f3e1c29fd to your computer and use it in GitHub Desktop.
DataDog Issue (2)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
// Must come before importing any instrumented module. | |
import tracer from 'dd-trace'; | |
tracer.init(); | |
import { PubSub } from '@google-cloud/pubsub' | |
import { Storage } from "@google-cloud/storage" | |
import { status as gRPCStatus } from '@grpc/grpc-js' | |
class QueueProcessor { | |
constructor({projectId, topicNameOrId, subscriptionName}) { | |
this.subscriptionName = subscriptionName | |
this.topic = new PubSub({projectId}).topic(topicNameOrId) | |
this.bucket = new Storage(this.storageOptions()).bucket(storageBucketName) | |
this.pageFetcher = new PageFetcher(this.bucket) | |
} | |
async run() { | |
try { | |
await this.topic.createSubscription(this.subscriptionName) | |
} catch (error) { | |
if (error.code == gRPCStatus.ALREADY_EXISTS || error.code == gRPCStatus.PERMISSION_DENIED) { | |
this.logger.debug("failed to create pubsub subscription, assuming it exists as subscribable") | |
} else { | |
this.logger.error("failed to create subscription", { 'error': error.toString() }) | |
throw error | |
} | |
} | |
await this.pageFetcher.initialize() | |
this.subscription.on('message', this.processPubSubMessage.bind(this)) | |
this.subscription.on('error', this.processPubSubError.bind(this)) | |
} | |
async processPubSubError(err) { | |
this.logger.error("received error from pubsub topic", { error: err }) | |
this.logger.info("terminating") | |
await this.subscription.close() | |
await this.pageFetcher.close() | |
} | |
async processPubSubMessage(message) { | |
let span = tracer.scope().active() | |
let payload | |
span.setOperationName("QueueProcessor.processPubSubMessage") | |
span.setTag("messageId", message.id) | |
try { | |
payload = JSON.parse(message.data) | |
console.log(`received request, url:${payload.url}`) | |
span.setTag("pageUrl", payload.url) | |
span.setTag("pageDomain", new URL(payload.url).hostname) | |
// Page Fetching specific stuff | |
if (response.ok()) { | |
// Other stuff | |
message.ack() | |
span.setTag("ackMessage", true) | |
} else { | |
message.nack() | |
span.setTag("ackMessage", false) | |
} | |
} catch(e) { | |
span.addTags({ | |
'error.type': e.name, | |
'error.msg': e.message, | |
'error.stack': e.stack | |
}) | |
if (e instanceof SupperssableError) { | |
console.log(`got a suppressable error while fetching the page: ${e.toString()}\n\tpayload: ${payload}, won't retry`) | |
message.ack() | |
span.setTag("ackMessage", true) | |
} else { | |
console.warn(`failed to fetch page: ${e}\n\tpayload: ${payload}, will retry`) | |
message.nack() | |
span.setTag("ackMessage", false) | |
} | |
} finally { | |
span.finish() | |
} | |
} | |
} | |
const processor = new QueueProcessor({ | |
projectId: values.google_cloud_project, | |
topicNameOrId: values.pubsub_topic, | |
subscriptionName: values.pubsub_subscription, | |
}) | |
await processor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment