Skip to content

Instantly share code, notes, and snippets.

@andrevrochasilva
Last active December 9, 2023 19:03
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save andrevrochasilva/45d49575d7bfa468e24fb1b8d56fb744 to your computer and use it in GitHub Desktop.
Save andrevrochasilva/45d49575d7bfa468e24fb1b8d56fb744 to your computer and use it in GitHub Desktop.
Codes used to make the POC with dataflow, cloud functions and BigQuery
const { google } = require('googleapis');
const dataflow = google.dataflow('v1b3');
const TEMPLATE_BUCKET = `your-bucket`;
module.exports.kickOffDataflow = async (event, event_description) => { // Check if the file was put in 'input/' folder
if (!event['id'].includes('/input/')){
return
}
const input = 'gs://' + event['id'].split('/').slice(0, -1).join('/')
const jobName = `citiesonbigquery`;
const tmpLocation = `gs://${TEMPLATE_BUCKET}/tmp`;
const templatePath = `gs://${TEMPLATE_BUCKET}/pathto/template`;
const request = {
projectId: 'yourprojectname',
requestBody: {
jobName: jobName,
parameters: {
inputFile: input
},
environment: {
tempLocation: tmpLocation
}
},
gcsPath: templatePath
}
const auth = await google.auth.getClient({
scopes: ['https://www.googleapis.com/auth/cloud-platform']
})
request.auth = auth;
return dataflow.projects.templates.launch(request);
}
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
class StateSeparation(beam.DoFn):
def process(self, element):
State, StateID, CityID, city_name, Population = element.split(',')
return [
{
'State': State,
'City': city_name,
'CityId': int(CityID),
'Population': int(Population.replace('.', '').replace('(*)', ''))
}
]
table_schema = {'fields': [
{'name': 'State', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'City', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'CityId', 'type': 'INTEGER', 'mode': 'REQUIRED'},
{'name': 'Population', 'type': 'INTEGER', 'mode': 'NULLABLE'}
]}
def run():
table_spec = bigquery.TableReference(
projectId='yourprojectname',
datasetId='yourdatasetid',
tableId='yourtable')
options = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=options)
read = (
p
| 'Reads from csv' >> beam.io.ReadFromText('gs://your-bucket/cities.csv')
| 'Structures data' >> beam.ParDo(StateSeparation())
)
bq = (
read
| 'Writes To BigQuery' >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
result = p.run()
result.wait_until_finish() # For it to hold the terminal until it finishes
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment