Skip to content

Instantly share code, notes, and snippets.

path = "/mnt/delta/your_table_or_path" # table path or use "tableName" option below
checkpoint = "/mnt/checkpoints/nb_view_your_table"
# If this is the FIRST time and you want to skip old data, capture current version:
first_run_skip_history = False # set True only on the very first start
if first_run_skip_history:
current_ver = (spark.sql(f"DESCRIBE HISTORY delta.`{path}`")
.selectExpr("max(version) as v").first().v)
starting_opts = {"startingVersion": str(current_ver)}
else:
# workspace profile (host = https://adb-<id>.<region>.azuredatabricks.net)
WS_PROFILE=ws
# 1) Resolve IDs
ADMINS_ID=$(databricks groups list --filter "displayName eq 'admins'" \
-o json -p "$WS_PROFILE" | jq -r '.Resources[0].id')
USER_ID=$(databricks users list --filter "userName eq 'alice@example.com'" \
-o json -p "$WS_PROFILE" | jq -r '.Resources[0].id')
# 2) Patch the group membership (SCIM PatchOp)
@stacklikemind
stacklikemind / metrics_streaming.py
Created June 19, 2025 20:31
creating metrics from delta lake tables
from pyspark.sql import Row
from pyspark.sql.functions import current_timestamp
from pyspark.sql.streaming import DataStreamWriter
# 1) Path to your Delta table
DELTA_PATH = "/delta/your_table"
# 2) This function runs every micro-batch
def computeTableMetrics(batchDF, batchId):
# a) count rows
from confluent_kafka import Consumer
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(['your-topic'])
import base64
from azure.identity import DefaultAzureCredential
from azure.keyvault.keys import KeyClient
from azure.keyvault.keys.crypto import CryptographyClient, KeyWrapAlgorithm
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
# ---- CONFIGURATION ----
vault_url = "https://<your-key-vault-name>.vault.azure.net/"
kek_name = "<your-wrapping-key-name>"
aes_gcm_iv_len = 12 # 96 bits is standard for AES-GCM
from pyspark.sql.functions import col
from confluent_kafka import Consumer, KafkaException
def process_batch(df, epoch_id):
# This is a workaround using mapPartitions to access raw Kafka message headers.
def fetch_headers(iterator):
for row in iterator:
# Access Kafka metadata
topic = row['topic']
partition = row['partition']
import duckdb
import pandas as pd
# Define database and connection
db_path = 'your_database.duckdb'
conn = duckdb.connect(db_path)
# Create table if it does not exist
conn.execute('''
CREATE TABLE IF NOT EXISTS your_table (
import sys
from azure.identity import ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceNotFoundError, ClientAuthenticationError, HttpResponseError
# Hardcoded storage account name
STORAGE_ACCOUNT_NAME = "yourstorageaccountname"
def download_blob(container_name, blob_path, download_file_path):
try:
import os
from datetime import datetime, timedelta
import pytz
# Define the file path
timestamp_file = "timestamp.txt"
# Define the CET timezone
cet = pytz.timezone("CET")
sed '1s/.*/-----BEGIN CERTIFICATE-----/; 2,$s/\(.\{64\}\)/\1\n/g; $s/.*/-----END CERTIFICATE-----/'