Skip to content

Instantly share code, notes, and snippets.

@klesouza
klesouza / dagster_databricks.py
Last active March 31, 2023 19:15
Example Dagster pipeline running on Azure Databricks
from dagster import pipeline, solid, repository, execute_pipeline
from dagster.core.definitions.mode import ModeDefinition
from dagster_databricks import databricks_pyspark_step_launcher
from pathlib import Path
from dagster_pyspark import pyspark_resource
from dagster_azure.adls2.io_manager import adls2_pickle_io_manager
from dagster_azure.adls2 import adls2_resource
from dagster import pipeline, solid, repository, execute_pipeline
from dagster.core.definitions.mode import ModeDefinition
from dagster_databricks import databricks_pyspark_step_launcher
@klesouza
klesouza / docker-compose.yaml
Last active February 15, 2021 15:34
VS Code devcontainer for pyspark
version: '3'
services:
spark:
image: jupyter/pyspark-notebook
volumes:
- .:/home/jovyan/code
ports:
- "8888:8888"
environment:
PYTHONPATH: "/usr/local/spark/python/lib/py4j-0.10.9-src.zip:/usr/local/spark/python:"
@klesouza
klesouza / beam_parquet_dynamicdestination.py
Last active March 23, 2023 11:32
Apache Beam Python SDK - write Parquet files with dynamic destinations
class ParquetSink(beam.io.fileio.FileSink):
def __init__(self,
file_path_prefix,
schema,
row_group_buffer_size=64 * 1024 * 1024,
record_batch_size=1000,
codec='none',
use_deprecated_int96_timestamps=False,
file_name_suffix='',
num_shards=0,
@klesouza
klesouza / parquet_read_write.java
Last active February 8, 2021 10:18
Reading and writing Parquet files using ParquetAvro library
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
ParquetReader<GenericRecord> r = AvroParquetReader.<GenericRecord>builder(new Path("file.parquet"))
.withDataModel(GenericData.get())
.build();
ParquetWriter<GenericRecord> w = AvroParquetWriter.<GenericRecord>builder(new Path("file2.parquet"))
.withDataModel(GenericData.get())
.withCompressionCodec(CompressionCodecName.SNAPPY)
@klesouza
klesouza / parquet_column_size.py
Created February 8, 2021 10:12
Calculating total compressed size of a parquet column
import pyarrow.parquet
def calculate_column_size(column, file):
m = pyarrow.parquet.read_metadata(file)
sz = sum([c["total_compressed_size"] for x in m.to_dict()["row_groups"] for c in x["columns"] if c["path_in_schema"].startswith(column)])
totalsize = sum([c["total_compressed_size"] for x in m.to_dict()["row_groups"] for c in x["columns"]])
return sz, sz/totalsize
@klesouza
klesouza / my_git_log.py
Created November 3, 2020 15:14
List git commits from all repositories in a folder
import subprocess
import datetime
import os
import argparse
import re
def get_git_log(root_path: str, month: int, author: str):
year = datetime.datetime.utcnow().year
m = datetime.datetime(year, month, 1)
em = (m + datetime.timedelta(days=32)).month
@klesouza
klesouza / ParquetLoader.cs
Created July 31, 2020 12:39
Loading Parquet files using ML.NET internal ParquetLoader (Reflection)
public static IDataView LoadParquetFile(string path, IHostEnvironment ctx)
{
var parquetAssembly = Assembly.Load("Microsoft.ML.Parquet");
var parquetLoarderType = parquetAssembly.GetType("Microsoft.ML.Data.ParquetLoader");
var parquetArgsType = parquetLoarderType.GetNestedType("Arguments");
var methods = parquetLoarderType.GetConstructor(new[] { typeof(IHostEnvironment), parquetArgsType, typeof(string) });
var p = methods.Invoke(new object[] { ctx, parquetArgsType.GetConstructors()[0].Invoke(null), path });
return (IDataView)p;
}
@klesouza
klesouza / my_git_log.sh
Created April 15, 2020 09:18
Find all your commits in all git repos
#!/bin/bash
find . -type d -depth 1 -exec sh -c 'git --git-dir={}/.git --work-tree=$PWD/{} log --format="%cs %aE %s" --before 2020-04-1 --after 2020-03-02 | grep [YOUR_USER]' \; -print
@klesouza
klesouza / find_k8s_delete.sh
Created February 3, 2020 14:47
Find all resources in Kubernetes for deletion
kubectl api-resources --verbs="get" --no-headers=true | awk '{ print $1; }' | xargs -I {} sh -c 'for i in `kubectl get $1`; do echo "$1\t$i"; done' sh {} | awk '{ print "kubectl delete " $1 " " $2; }' | grep WHAT_YOUR_ARE_LOOKING_FOR
@klesouza
klesouza / bq_tfdv.py
Created January 26, 2020 20:56
Analyse BigQuery data with TFDV (tensorflow data validation)
import apache_beam as beam
import pyarrow
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import statistics_pb2
import numpy as np
pipeline_options = beam.pipeline.PipelineOptions.from_dictionary({
'project': '[PROJECT_ID]'
})