Skip to content

Instantly share code, notes, and snippets.

@Lucasnobrepro
Created April 23, 2025 03:53
Show Gist options
  • Save Lucasnobrepro/2e376de1579a477419ee95adac3875b4 to your computer and use it in GitHub Desktop.
Save Lucasnobrepro/2e376de1579a477419ee95adac3875b4 to your computer and use it in GitHub Desktop.
# full_load_dynamodb.py
import boto3
from pyspark.sql import SparkSession
import json
# Configuração
TABLE_NAME = 'sua_tabela_dynamodb'
DELTA_PATH = '/caminho/para/delta/full_load_output' # ou s3://meu-bucket/path
# Inicializa Spark com Delta Lake
spark = SparkSession.builder \
.appName("Full Load DynamoDB to Delta") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Cliente DynamoDB
dynamodb = boto3.client('dynamodb')
def dynamodb_items_to_dict(items):
"""Converte items no formato DynamoDB para dict normal."""
from boto3.dynamodb.types import TypeDeserializer
deserializer = TypeDeserializer()
return [{k: deserializer.deserialize(v) for k, v in item.items()} for item in items]
def full_load_batch():
print(f"Fazendo full load da tabela {TABLE_NAME}...")
paginator = dynamodb.get_paginator('scan')
page_iterator = paginator.paginate(TableName=TABLE_NAME)
total_count = 0
for page in page_iterator:
raw_items = page['Items']
if not raw_items:
continue
# Converte para dicts normais
records = dynamodb_items_to_dict(raw_items)
# Cria DataFrame Spark
df_spark = spark.createDataFrame(records)
# Append no Delta
df_spark.write.format("delta").mode("append").save(DELTA_PATH)
total_count += len(records)
print(f"{total_count} registros escritos até agora...")
print(f"Full load completo. Total de registros: {total_count}")
if __name__ == '__main__':
full_load_batch()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment