Skip to content

Instantly share code, notes, and snippets.

@jdanbrown
Last active January 2, 2019 00:16
Show Gist options
  • Save jdanbrown/1f45dc71ad37b430e8a045563bb8999d to your computer and use it in GitHub Desktop.
Save jdanbrown/1f45dc71ad37b430e8a045563bb8999d to your computer and use it in GitHub Desktop.
Custom Zapier app for BigQuery
/*
1. Account -> Developers -> new app
2. Authentication: use "Unknown Auth", then manually adds fields `service_account_email` and `service_account_private_key_json`
- Auth is done manually via service account key in the js below
3. Triggers: add triggers with specific keys and fields to match the custom js code below, which overrides their behavior
- Create a "polling" trigger with key `test_trigger`, with field `project`
- Create a "polling" trigger with key `new_row`, with fields `project` and `query`
- Create a "polling" trigger with key `table_modified`, with fields `project`, `dataset`, and `table`
4. Scripting API: paste in this whole file as is
TODO:
- Put this in an MIT-licensed repo, as per the inclusion of source from https://github.com/machadogj/node-jwt-sign
- Turn this into a Zapier CLI app instead of a copy/paste gist
- Add instructions for OAuth instead of service account key
- Support write actions (currently just read triggers)
*/
'use strict';
//
// Include: https://github.com/machadogj/node-jwt-sign/blob/3e9e86f/lib/jwt-sign.js
//
/*
* jwt-sign
*
* JSON Web Token RSA with SHA256 sign for Google APIs
*
* Copyright(c) 2012 Gustavo Machado
* MIT Licensed
*/
/**
* module dependencies
*/
var crypto = require('crypto');
/**
* expose object
*/
//var jwt = module.exports;
var jwt = {};
/**
* version
*/
jwt.version = '0.1.0';
/**
* Sign the Google API jwt token.
*
* @param {Object} the payload part of the token.
* @return {string} the signed and base64 encoded jwt token.
* @api public
*/
jwt.sign = function jwt_sign(payload, key) {
// Check key
if (!key) {
throw new Error('key is required');
}
if (payload === undefined || payload === null) {
throw new Error('palyload is required');
}
// header, typ is fixed value, alg supported by google is RSA with SHA256
var header = { typ: 'JWT', alg: 'RS256' };
// create segments, all segment should be base64 string
var segments = [];
segments.push(base64urlEncode(JSON.stringify(header)));
segments.push(base64urlEncode(JSON.stringify(payload)));
var signature = sign(segments.join('.'), key);
if (!signature) {
throw new Error('error generating signature');
}
segments.push(signature);
//return encodeURIComponent(segments.join('.')); // XXX(db): Had to change this
return segments.join('.');
};
/**
* private util functions
*/
function sign (data, key) {
var signer = crypto.createSign("RSA-SHA256");
signer.update(data);
return signer.sign(key, 'base64');
}
function base64urlEncode(str) {
//return new Buffer(str).toString('base64'); // XXX(db): Had to change this
return btoa(str);
}
//
// Main
//
// Docs: https://github.com/machadogj/node-jwt-sign/tree/3e9e86f#usage
var jwt_for_service_account = function(bundle) {
var service_account_email = bundle.auth_fields.service_account_email;
var service_account_private_key = z.JSON.parse(bundle.auth_fields.service_account_private_key_json);
var payload = {
"iss": service_account_email,
"scope": 'https://www.googleapis.com/auth/bigquery',
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": ~~(new Date().getTime() / 1000) + (30 * 60),
"iat": ~~(new Date().getTime() / 1000 - 60)
};
var key = service_account_private_key.private_key;
return jwt.sign(payload, key);
};
// Docs: https://developers.google.com/identity/protocols/OAuth2ServiceAccount
var access_token_for_service_account = function(bundle) {
var req = {
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
method: 'POST',
url: 'https://www.googleapis.com/oauth2/v4/token',
data: $.param({
grant_type: 'urn:ietf:params:oauth:grant-type:jwt-bearer',
assertion: jwt_for_service_account(bundle)
})
};
var resp = z.request(req);
if (String(resp.status_code)[0] != '2') {
throw new ErrorException('Auth request failed: ' + JSON.stringify({req: req, resp: resp}));
}
var resp_body = z.JSON.parse(resp.content);
return resp_body.access_token;
};
var request_bq = function(bundle, req) {
// Translate caller req into z.request req
req.headers = req.headers || {};
req.headers['Authorization'] = 'Bearer ' + access_token_for_service_account(bundle);
req.headers['Content-Type'] = 'application/json';
req.url = 'https://content.googleapis.com/bigquery/v2/projects/' + bundle.trigger_fields.project + req.route;
req.data = JSON.stringify(req.body);
delete req.route;
delete req.body;
var resp = z.request(req);
var resp_body = z.JSON.parse(resp.content);
console.log({method: req.method, url: req.url, data: req.data, resp_body: JSON.stringify(resp_body)});
if (!resp_body || resp_body.error) {
throw new ErrorException('Request failed: ' + JSON.stringify(resp_body.error));
} else {
return resp_body;
}
};
var sleep_s = function(seconds) {
console.log('Sleeping ' + seconds + 's...');
// Hey I just met you, and this is crazy, but here's my number, so call me maybe
z.request({
method: 'GET',
url: 'http://httpbin.org/delay/' + seconds // Responds after 10s if seconds > 10
});
};
var with_backoff = function(backoff_start_s, backoff_growth, f) {
var backoff_s = backoff_start_s;
var res = f();
while (!res) {
sleep_s(backoff_s);
backoff_s *= backoff_growth;
res = f();
}
return res;
};
var bq_query_results_to_list_of_dicts = function(query_results) {
var names = query_results.schema.fields.map(function(x) { return x.name; });
return query_results.rows.map(function(row) {
return row.f.reduce(function(dict, field, i) {
dict[names[i]] = field.v;
return dict;
}, {});
});
};
// Docs:
// - https://zapier.com/developer/documentation/v2/scripting/
// - https://zapier.com/developer/documentation/v2/reference/
var Zap = {
test_trigger_poll: function(bundle) {
var datasets = request_bq(bundle, {
method: 'GET',
route: '/datasets'
});
return datasets.datasets;
},
table_modified_poll: function(bundle) {
// Docs: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get
var table = request_bq(bundle, {
method: 'GET',
route: '/datasets/'+bundle.trigger_fields.dataset+'/tables/'+bundle.trigger_fields.table
});
return [
{id: table.lastModifiedTime}
];
},
new_row_poll: function(bundle) {
// Docs:
// - https://cloud.google.com/bigquery/querying-data
// - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
// - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query
// - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
var job = request_bq(bundle, {
method: 'POST',
route: '/jobs',
body: {
configuration: {
query: {
query: bundle.trigger_fields.query,
useLegacySql: false,
useQueryCache: true,
destinationTable: null
}
}
}
});
var query = with_backoff(1, 2, function() {
var query = request_bq(bundle, {
method: 'GET',
route: '/queries/' + job.jobReference.jobId,
params: {
timeoutMs: 10000, // Long polls until .jobComplete:true
maxResults: 1000
}
});
return query.jobComplete ? query : null;
});
return bq_query_results_to_list_of_dicts(query);
}
};
@fredhope2000
Copy link

To add the ability to query Google sheets that are synced to BQ as tables, add https://www.googleapis.com/auth/drive to the scope requested at https://gist.github.com/jdanbrown/1f45dc71ad37b430e8a045563bb8999d#file-zapier-bigquery-js-L109
eg:

"scope": 'https://www.googleapis.com/auth/bigquery https://www.googleapis.com/auth/drive',

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment