Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import json
import logging
import os
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.0.1,"
"org.apache.spark:spark-avro_2.12:3.0.1 pyspark-shell" # Adicionando as bibliotecas necessárias para o consumo dos dados
dir_path = f'{os.path.dirname(os.path.realpath(__file__))}/data' # path para armazenar os dados
def process(time, rdd):
print(f'Start process: {time}')
rdd = rdd.map(lambda v: json.loads(v))
df = rdd.toDF()
df.show()
df.write.mode("append").format("csv").save(dir_path)
if __name__ == "__main__":
try:
spark_batch_interval = 60 # Intervalo entre os micro batchs
app_name = 'stream_data' # Nome da sua aplicação de streaming
kinesis_stream_name = 'data_streaming' # Nome que foi dado ao kinesis
kinesis_endpoint = 'kinesis.us-east-1.amazonaws.com' # Endpoint do kinesis, esse é o default na região us-east-1
aws_region = 'us-east-1' # AWS Region
kinesis_initial_position = InitialPositionInStream.TRIM_HORIZON # De onde vamos iniciar a leitura dos dados
kinesis_checkpoint_interval = 60 # Intervalo que sera salvo o checkpoint da leitura do kinesis
spark_session = SparkSession.builder \
.appName(app_name) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.getOrCreate() # Criando spark context
spark_streaming_context = StreamingContext(spark_session.sparkContext,
spark_batch_interval) # Criando streaming Context
kinesis_streams = KinesisUtils.createStream(
spark_streaming_context, app_name, kinesis_stream_name, kinesis_endpoint,
aws_region, kinesis_initial_position,
kinesis_checkpoint_interval) # Inicializando o streaming com kinesis utils
kinesis_streams.foreachRDD(process) # Processando os micro batchs
spark_streaming_context.start()
spark_streaming_context.awaitTermination()
except Exception as ex:
logging.error(ex)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment