Skip to content

Instantly share code, notes, and snippets.

@pascalwhoop
Last active October 20, 2019 14:30
Show Gist options
  • Save pascalwhoop/74263bb626563df264ed9cf17c950adf to your computer and use it in GitHub Desktop.
Save pascalwhoop/74263bb626563df264ed9cf17c950adf to your computer and use it in GitHub Desktop.
import apache_beam as beam
import json
from apache_beam.io import ReadFromText
from apache_beam.io import BigQuerySource
from apache_beam.io import BigQuerySink
from apache_beam.io import WriteToText
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "pascalwhoop"
google_cloud_options.job_name = "phone-sensors-cleanup"
google_cloud_options.staging_location = "gs://pascalwhoop-private/staging"
google_cloud_options.temp_location = "gs://pascalwhoop-private/temp"
#options.view_as(StandardOptions).runner = "DirectRunner" # use this for debugging
options.view_as(StandardOptions).runner = "DataFlowRunner"
# see here for bigquery docs https://beam.apache.org/documentation/io/built-in/google-bigquery/
source_table_spec = bigquery.TableReference(
projectId="pascalwhoop", datasetId="phone_sensors", tableId="heartbeat"
)
sink_table_spec = bigquery.TableReference(
projectId="pascalwhoop", datasetId="phone_sensors", tableId="heartbeat_cleaned"
)
def make_sink_schema():
mapping = {
"altitude": "FLOAT",
"battery_status": "INTEGER",
"bluetooth_status": "STRING",
"cell_id": "STRING",
"cell_strength": "INTEGER",
"gps_status": "STRING",
"last_app": "STRING",
"location_accuracy": "FLOAT",
"location_gps": "STRING",
"location_net": "STRING",
"location_seconds": "STRING",
"speed": "FLOAT",
"timestamp": "INTEGER"
}
mapping_list = [{"mode": "NULLABLE", "name": k, "type": mapping[k]} for k in mapping.keys()]
return json.JSONEncoder(sort_keys=True).encode({"fields": mapping_list})
table_schema = parse_table_schema_from_json(make_sink_schema())
#source = BigQuerySource(query="SELECT * FROM `pascalwhoop.phone_sensors.heartbeat` LIMIT 10", use_standard_sql=True) # you can also use SQL queries
source = BigQuerySource(source_table_spec)
target = BigQuerySink(sink_table_spec, schema=table_schema)
#target = beam.io.WriteToText("output.txt")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment