Skip to content

Instantly share code, notes, and snippets.

@gxercavins
gxercavins / deadletters_dataflow.py
Created November 29, 2019 23:27
SO question 59102519
import logging
import apache_beam as beam
PROJECT = "PROJECT_ID"
BUCKET = "BUCKET_NAME"
schema = "index:INTEGER,event:STRING"
FIELD_NAMES = ["index","event"]
class CsvToDictFn(beam.DoFn):
@gxercavins
gxercavins / BigQueryUpsert.java
Created February 8, 2020 12:07
SO question 60070098
package org.apache.beam.examples;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
@gxercavins
gxercavins / mp3.py
Created January 21, 2020 10:15
SO question 59827321
#!/usr/bin/env python
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
@gxercavins
gxercavins / one_window_one_file.py
Last active December 1, 2020 23:22
Stackoverflow question 56234318
import argparse, json, logging, time
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.io import filesystems
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class AddWindowingInfoFn(beam.DoFn):
@gxercavins
gxercavins / schema-in-side-input.py
Created December 27, 2019 18:55
SO question 59458599
import argparse, json, logging
import apache_beam as beam
import apache_beam.pvalue as pvalue
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class EnrichElementsFn(beam.DoFn):
@gxercavins
gxercavins / zip.py
Created April 10, 2019 14:54
SO question 55485228 approach 2
import argparse, logging, time
import inflect
import apache_beam as beam
import apache_beam.transforms.combiners as combine
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.coders import VarIntCoder
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
@gxercavins
gxercavins / queries.sql
Created February 16, 2020 10:28
SO question 60246807
-- create t1
CREATE TABLE
overwrite.t1 (sales INT64,
sdate DATE)
PARTITION BY
sdate;
-- create t2
CREATE TABLE
overwrite.t2 (sales INT64,
@gxercavins
gxercavins / pd-concat.py
Created February 8, 2020 14:53
SO question 60080589
import argparse, logging
import pandas as pd
from random import choice
import apache_beam as beam
from apache_beam.options.pipeline_options import SetupOptions
import apache_beam.transforms.combiners as combine
import apache_beam.pvalue as pvalue
@gxercavins
gxercavins / AllSideOutputs.java
Created February 1, 2020 21:51
SO question 60011995
package org.apache.beam.examples;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@gxercavins
gxercavins / mp3.py
Created January 21, 2020 10:15
SO question 59827321
#!/usr/bin/env python
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#