Skip to content

Instantly share code, notes, and snippets.

@rolangom
Created July 14, 2022 16:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rolangom/3d791d6cf44d9e4d022ac67bf223a4e3 to your computer and use it in GitHub Desktop.
Save rolangom/3d791d6cf44d9e4d022ac67bf223a4e3 to your computer and use it in GitHub Desktop.
DGII Query handler using GCP and FP crocks library
// const crocks = require('crocks');
// const fs = require('fs');
const axios = require('axios');
const unzip = require('unzip');
const rimraf = require('rimraf');
const Async = require('crocks/Async');
const maybeToAsync = require('crocks/Async/maybeToAsync');
const { curry, prop, compose, safe } = require('crocks/helpers');
const isObject = require('crocks/predicates/isObject');
const { chain, run, map, head } = require('crocks/pointfree');
const { constant } = require('crocks/combinators');
const TABLE_SCHEMA = require('./tableSchema');
const BigQuery = require('@google-cloud/bigquery');
// Your Google Cloud Platform project ID
const projectId = 'shop-f518d';
// const keyFilename = './shop-81534f0bf144.json';
// Creates a client
const bigquery = new BigQuery({
projectId // , keyFilename
});
const dataset = bigquery.dataset('production');
// const ZIPPED_FILE_URL = 'https://storage.googleapis.com/react-firebaseui-example.appspot.com/2010_Census_Populations_by_Zip_Code.zip';
const ZIPPED_FILE_URL = 'http://www.dgii.gov.do/app/WebApps/Consultas/rnc/DGII_RNC.zip';
// const ZIPPED_FILE_URL = 'https://storage.googleapis.com/react-firebaseui-example.appspot.com/DGII_RNC.zip';
// const LOCAL_ZIP_FILE = '/tmp/2010_Census_Populations_by_Zip_Code.csv.zip';
const LOCAL_UNZIPPED_FOLDER = '/tmp/unzipped';
const LOCAL_UNZIPPED_FILE = `${LOCAL_UNZIPPED_FOLDER}/TMP/DGII_RNC.TXT`;
// const TABLE_NAME = 'population';
const TABLE_NAME = 'taxpayers';
const rmrfAsync = path => new Promise((resolve, reject) =>
rimraf(path, {}, (err, data) =>
err ? reject(err) : resolve(data)
)
);
// const delay = ms => new Promise((resolve) => setTimeout(resolve, ms));
// const delayAsync =
// ms => x => Async(
// (reject, resolve) => delay(ms).then(_ => resolve(x), reject)
// );
const log =
label => x =>
(console.log(label), x);
// clearTmpFolder :: String -> Async Error ()
const clearTmpFolder = path => Async(
(reject, resolve) => rmrfAsync(path).then(resolve, reject)
);
// httpGetFile :: String -> Async(Error, http.IncomingMessage)
const httpGetFileAxios = url => Async((reject, resolve) =>
axios.get(url, { responseType: 'stream' })
.then(({ data }) => resolve(data))
.catch(err => reject(err))
);
// pipeStream :: String -> Writable -> Readable -> Async Error ()
const pipeStream = curry((writable, readable) => Async(
(reject, resolve) => {
writable.on('finish', resolve);
writable.on('error', reject);
readable.pipe(writable);
}
));
// deleteTable :: Dataset -> String -> Async Error ()
const deleteTable = curry((dataset, tableId) => Async(
(reject, resolve) => dataset.table(tableId).delete(resolve, reject)
));
// createTable :: Dataset -> String -> Schema -> Async Error Table
const createTable = curry((dataset, tableId, schema) => Async(
(reject, resolve) =>
dataset.createTable(tableId, { schema: schema })
.then(([table]) => resolve(table), reject)
));
// loadTable :: String -> Table -> Async Error ApiResponse
// const loadTable = curry((path, table) => Async(
// (reject, resolve) =>
// table.load(path, { fieldDelimiter: '|' })
// .then(([apiResponse]) => resolve(apiResponse), reject)
// ));
const loadJobTable = curry((path, table) => Async(
(reject, resolve) =>
table.createLoadJob(path, {
format: 'CSV',
fieldDelimiter: '|',
allowJaggedRows: true,
maxBadRecords: 5000000,
ignoreUnknownValues: true,
quote: '',
encoding: 'ISO-8859-1'
})
.then(([job]) => resolve(job), reject)
));
// handleJob :: Job -> Async Error Metadata
const handleJob = job => Async(
(reject, resolve) => {
job.on('complete', resolve);
job.on('error', reject);
}
);
// onErrResp :: Response -> Error -> ()
const onErrResp =
res => err => (
console.error('Error', err),
res
.status(400)
.json({ message: err.message })
);
// onSuccResp :: Response -> a -> ()
const onSuccResp =
res => x => res.json(x);
const reloadNCFData = (req, res) => {
clearTmpFolder(LOCAL_UNZIPPED_FOLDER)
.map(log('clearTmpFolder 1'))
.chain(constant(httpGetFileAxios(ZIPPED_FILE_URL)))
.map(log('httpGetFileAxios 2'))
.chain(pipeStream(unzip.Extract({ path: LOCAL_UNZIPPED_FOLDER })))
.map(log('unzip.Extract 3'))
.chain(constant(deleteTable(dataset, TABLE_NAME)))
.map(log('deleteTable 4'))
.chain(constant(createTable(dataset, TABLE_NAME, TABLE_SCHEMA)))
.map(log('createTable 5'))
.chain(loadJobTable(LOCAL_UNZIPPED_FILE))
.map(log('loadJobTable 6'))
.chain(handleJob)
.map(log('handleJob 7'))
.fork(
onErrResp(res),
onSuccResp(res)
);
};
// getTaxId :: Request -> Maybe String
const getReqTaxId = compose(
chain(prop('id')),
chain(prop('query')),
safe(isObject)
);
// getQueryStr :: String -> String
const getQueryStr = id =>
`select * from production.taxpayers where id = "${id}" limit 1`;
// queryTable :: String -> Async Error [row]
const queryTable = id => Async(
(reject, resolve) =>
dataset.table(TABLE_NAME)
.query({ query: getQueryStr(id) })
.then(([rows]) => resolve(rows), reject)
);
// resultAsObject :: [row] -> Async Error row
const resultAsObject = compose(
maybeToAsync(Error('Not found')),
head
);
// queryBy :: Request -> Async Error row
const queryBy = compose(
chain(resultAsObject),
chain(queryTable),
maybeToAsync(Error("Query 'id' param is required")),
map(log('getReqTaxId')),
getReqTaxId
);
const queryDgiiBy = (req, res) => {
queryBy(req)
.fork(
onErrResp(res),
onSuccResp(res)
)
};
exports.reloadNCFData = reloadNCFData;
exports.queryDgiiBy = queryDgiiBy;
// to deploy: gcloud beta functions deploy reloadNCFData --trigger-http
// to deploy: gcloud beta functions deploy queryDgiiBy --trigger-http
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment