Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Created November 29, 2019 23:27
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gxercavins/a1d23b5cda0f32d895cb1f790774d8a1 to your computer and use it in GitHub Desktop.
Save gxercavins/a1d23b5cda0f32d895cb1f790774d8a1 to your computer and use it in GitHub Desktop.
SO question 59102519
import logging
import apache_beam as beam
PROJECT = "PROJECT_ID"
BUCKET = "BUCKET_NAME"
schema = "index:INTEGER,event:STRING"
FIELD_NAMES = ["index","event"]
class CsvToDictFn(beam.DoFn):
def process(self, element):
return [dict(zip(FIELD_NAMES, element.split(",")))]
def run():
argv = [
"--project={0}".format(PROJECT),
"--staging_location=gs://{0}/staging/".format(BUCKET),
"--temp_location=gs://{0}/staging/".format(BUCKET),
"--runner=DataflowRunner",
"--max_num_workers=2",
"--save_main_session",
"--experiments=use_beam_bq_sink"
]
p = beam.Pipeline(argv=argv)
data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write results" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
method='STREAMING_INSERTS'
)
)
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("gs://{0}/error_log.txt".format(BUCKET)))
p.run()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
run()
import logging
import apache_beam as beam
PROJECT = "PROJECT_ID"
BUCKET = "BUCKET_NAME"
schema = "index:INTEGER,event:STRING"
FIELD_NAMES = ["index","event"]
class CsvToDictFn(beam.DoFn):
def process(self, element):
return [dict(zip(FIELD_NAMES, element.split(",")))]
def run():
argv = [
"--project={0}".format(PROJECT),
"--runner=DirectRunner"
]
p = beam.Pipeline(argv=argv)
data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write results" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
p.run()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment