Skip to content

Instantly share code, notes, and snippets.

@chikka
Last active November 27, 2019 10:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chikka/c51a81a637dc67c78b3f17207afb272e to your computer and use it in GitHub Desktop.
Save chikka/c51a81a637dc67c78b3f17207afb272e to your computer and use it in GitHub Desktop.
const aws = require('aws-sdk');
const sns = new aws.SNS();
const cors = require('cors');
const handler = require('serverless-express/handler');
const secretManager = require('./awsSecretService.js')
const express = require('serverless-express/express');
const ServerCore = require('@cubejs-backend/server-core');
const processHandlers = {
queryProcess: async (queryKey, orchestrator) => {
const queue = orchestrator.queryCache.getQueue();
await queue.processQuery(queryKey);
},
queryCancel: async (query, orchestrator) => {
const queue = orchestrator.queryCache.getQueue();
await queue.processCancel(query);
},
externalQueryProcess: async (queryKey, orchestrator) => {
const queue = orchestrator.queryCache.getExternalQueue();
await queue.processQuery(queryKey);
},
externalQueryCancel: async (query, orchestrator) => {
const queue = orchestrator.queryCache.getExternalQueue();
await queue.processCancel(query);
},
preAggregationProcess: async (queryKey, orchestrator) => {
const queue = orchestrator.preAggregations.getQueue();
await queue.processQuery(queryKey);
},
preAggregationCancel: async (query, orchestrator) => {
const queue = orchestrator.preAggregations.getQueue();
await queue.processCancel(query);
}
};
class CubeCustomHandler {
constructor(options) {
console.log("CubeCustomHandler called")
options = {
orchestratorOptions: (context) => ({
queryCacheOptions: {
queueOptions: {
sendProcessMessageFn: async (queryKey) => this.sendNotificationMessage(queryKey, 'queryProcess', context),
sendCancelMessageFn: async (query) => this.sendNotificationMessage(query, 'queryCancel', context)
},
externalQueueOptions: {
sendProcessMessageFn: async (queryKey) => this.sendNotificationMessage(queryKey, 'externalQueryProcess', context),
sendCancelMessageFn: async (query) => this.sendNotificationMessage(query, 'externalQueryCancel', context)
}
},
preAggregationsOptions: {
queueOptions: {
sendProcessMessageFn: async (queryKey) => this.sendNotificationMessage(queryKey, 'preAggregationProcess', context),
sendCancelMessageFn: async (query) => this.sendNotificationMessage(query, 'preAggregationCancel', context)
}
}
}),
...options
};
this.api = this.api.bind(this);
this.process = this.process.bind(this);
}
topicArn(topic) {
return `arn:aws:sns:${process.env.AWS_REGION}:${process.env.AWS_ACCOUNT_ID}:${topic}`;
}
// eslint-disable-next-line no-unused-vars
async sendNotificationMessage(message, type, context) {
throw new Error(`sendNotificationMessage is not implemented`);
}
async processMessage(message) {
const processFn = processHandlers[message.type];
if (!processFn) {
throw new Error(`Unrecognized message type: ${message.type}`);
}
const orchestratorApi = this.serverCore.getOrchestratorApi(message.context);
await processFn(message.message, orchestratorApi.orchestrator);
}
async sendNotificationMessage(message, type, context) {
const params = {
Message: JSON.stringify({ message, type, context }),
TopicArn: this.topicArn(`${process.env.CUBEJS_APP || 'cubejs'}-process`)
};
await sns.publish(params).promise();
}
async process(event) {
await Promise.all(event.Records.map(async record => {
const message = JSON.parse(record.Sns.Message);
await this.processMessage(message);
}));
return {
statusCode: 200
};
}
async getApiHandler() {
if (!this.apiHandler) {
console.log("getApiHandler")
const secret = await secretManager.getSecretValue("secretname")
process.env.CUBEJS_DB_HOST = secret.host
process.env.CUBEJS_DB_NAME = "df_analytics"
process.env.CUBEJS_DB_USER = secret.username
process.env.CUBEJS_DB_PASS = secret.password
process.env.CUBEJS_DB_PORT = secret.port
const app = express();
app.use(cors());
this.serverCore = new ServerCore(options);
this.serverCore.initApp(app);
this.apiHandler = handler(app);
}
return this.apiHandler;
}
async api(event, context) {
console.log("api")
return await this.getApiHandler(event, context);
}
}
module.exports = CubeCustomHandler;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment