Skip to content

Instantly share code, notes, and snippets.

@gxercavins
gxercavins / refresh.py
Created December 26, 2019 20:42
SO question 55055026
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def load_my_conversion_data():
@gxercavins
gxercavins / script.py
Last active December 24, 2019 10:19
SO question 59457641
import argparse, json, logging
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class JsonSink(fileio.TextSink):
def write(self, record):
@gxercavins
gxercavins / PubSubGetAttribute.java
Last active December 9, 2019 19:43
SO question 59219937
package com.dataflow.samples;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.joda.time.Duration;
import org.joda.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.sql.Timestamp;
import org.apache.beam.sdk.Pipeline;
@gxercavins
gxercavins / w2bq.py
Created December 6, 2019 18:12
SO question 59217700
import argparse, logging
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def set_last_step(file_list):
dic = {'folder': '1950', 'last_file': 'the_last_one.txt'}
@gxercavins
gxercavins / re-window.py
Created December 5, 2019 17:12
SO question 59198643
import argparse, json, logging, time
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class DebugPrinterFn(beam.DoFn):
@gxercavins
gxercavins / output.py
Last active December 4, 2019 19:47
SO question 59182161
import argparse, logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
class ParseHeadersFn(beam.DoFn):
"""ParDo to output only the headers"""
def process(self, element):
@gxercavins
gxercavins / SODemoQuestion.java
Last active December 4, 2019 12:04
SO question 59061351
package com.dataflow.samples;
import static com.google.common.base.MoreObjects.firstNonNull;
import com.google.common.collect.Iterables;
import com.google.gson.Gson;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
@gxercavins
gxercavins / deadletters_dataflow.py
Created November 29, 2019 23:27
SO question 59102519
import logging
import apache_beam as beam
PROJECT = "PROJECT_ID"
BUCKET = "BUCKET_NAME"
schema = "index:INTEGER,event:STRING"
FIELD_NAMES = ["index","event"]
class CsvToDictFn(beam.DoFn):
@gxercavins
gxercavins / DynamicBigQuerySchema.java
Last active November 16, 2019 13:55
SO question 58794005
package com.dataflow.samples;
import static com.google.common.base.MoreObjects.firstNonNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@gxercavins
gxercavins / AssignSessions.java
Created November 9, 2019 21:07
SO question 58731608
package com.dataflow.samples;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;