Created
April 23, 2025 03:53
-
-
Save Lucasnobrepro/2e376de1579a477419ee95adac3875b4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# full_load_dynamodb.py | |
import boto3 | |
from pyspark.sql import SparkSession | |
import json | |
# Configuração | |
TABLE_NAME = 'sua_tabela_dynamodb' | |
DELTA_PATH = '/caminho/para/delta/full_load_output' # ou s3://meu-bucket/path | |
# Inicializa Spark com Delta Lake | |
spark = SparkSession.builder \ | |
.appName("Full Load DynamoDB to Delta") \ | |
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ | |
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ | |
.getOrCreate() | |
# Cliente DynamoDB | |
dynamodb = boto3.client('dynamodb') | |
def dynamodb_items_to_dict(items): | |
"""Converte items no formato DynamoDB para dict normal.""" | |
from boto3.dynamodb.types import TypeDeserializer | |
deserializer = TypeDeserializer() | |
return [{k: deserializer.deserialize(v) for k, v in item.items()} for item in items] | |
def full_load_batch(): | |
print(f"Fazendo full load da tabela {TABLE_NAME}...") | |
paginator = dynamodb.get_paginator('scan') | |
page_iterator = paginator.paginate(TableName=TABLE_NAME) | |
total_count = 0 | |
for page in page_iterator: | |
raw_items = page['Items'] | |
if not raw_items: | |
continue | |
# Converte para dicts normais | |
records = dynamodb_items_to_dict(raw_items) | |
# Cria DataFrame Spark | |
df_spark = spark.createDataFrame(records) | |
# Append no Delta | |
df_spark.write.format("delta").mode("append").save(DELTA_PATH) | |
total_count += len(records) | |
print(f"{total_count} registros escritos até agora...") | |
print(f"Full load completo. Total de registros: {total_count}") | |
if __name__ == '__main__': | |
full_load_batch() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment