Skip to content

Instantly share code, notes, and snippets.

View coffeeandtips-tech's full-sized avatar

Coffee and Tips - Tech Tutorials coffeeandtips-tech

View GitHub Profile
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Criando uma Spark Session
spark = SparkSession.builder.appName("Spark UDF Example").getOrCreate()
# Criando um DataFrame de exemplo
data = [("joao",), ("maria",), ("carlos",)]
df = spark.createDataFrame(data, ["nome"])
nome tamanho_nome
Ana 3
Bruno 5
Carolina 8

+---------+-------------+ | nome |tamanho_nome| +---------+-------------+ | Ana | 3| | Bruno | 5| |Carolina | 8| +---------+-------------+

# Criando a função UDF para contar o número de caracteres em um nome
def name_length(name):
return len(name) if name else 0
# Registrando a UDF no Spark
spark.udf.register("name_length_udf", name_length, IntegerType())
# Criando um DataFrame de exemplo
data = [("Ana",), ("Bruno",), ("Carolina",)]
df = spark.createDataFrame(data, ["nome"])
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
}
from airflow import DAG
from airflow.sensors.http import HttpSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
}
from airflow.providers.postgres.sensors.postgres import PostgresSensor
wait_for_data = PostgresSensor(
task_id='wait_for_data',
postgres_conn_id='meu_postgres',
sql="""
SELECT COUNT(*) FROM pedidos WHERE status='pendente'
""",
mode='poke',
poke_interval=120,
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
download_file = S3KeySensor(
task_id='wait_for_s3_file',
bucket_name='meu-bucket',
bucket_key='dados/input.csv',
aws_conn_id='aws_default',
poke_interval=60,
timeout=3600,
mode='reschedule',
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'poke_interval': 30, # verifica a cada 30 segundos
}
WITH media_vendas AS (
SELECT cliente_id, AVG(valor) AS media_gasto
FROM vendas
GROUP BY cliente_id
)
SELECT * FROM media_vendas WHERE media_gasto > 1000;