Skip to content

Instantly share code, notes, and snippets.

@diogodilcl
Last active April 22, 2021 21:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save diogodilcl/4a88b37a1e035489337d3e4155cdc71c to your computer and use it in GitHub Desktop.
Save diogodilcl/4a88b37a1e035489337d3e4155cdc71c to your computer and use it in GitHub Desktop.
import json
import logging
import os
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.0.1,"
"org.apache.spark:spark-avro_2.12:3.0.1 pyspark-shell" # Adicionando as bibliotecas necessárias para o consumo dos dados
dir_path = f'{os.path.dirname(os.path.realpath(__file__))}/data' # path para armazenar os dados
def process(time, rdd):
print(f'Start process: {time}')
rdd = rdd.map(lambda v: json.loads(v))
df = rdd.toDF()
df.show()
df.write.mode("append").format("csv").save(dir_path)
if __name__ == "__main__":
try:
spark_batch_interval = 60 # Intervalo entre os micro batchs
app_name = 'stream_data' # Nome da sua aplicação de streaming
kinesis_stream_name = 'data_streaming' # Nome que foi dado ao kinesis
kinesis_endpoint = 'kinesis.us-east-1.amazonaws.com' # Endpoint do kinesis, esse é o default na região us-east-1
aws_region = 'us-east-1' # AWS Region
kinesis_initial_position = InitialPositionInStream.TRIM_HORIZON # De onde vamos iniciar a leitura dos dados
kinesis_checkpoint_interval = 60 # Intervalo que sera salvo o checkpoint da leitura do kinesis
spark_session = SparkSession.builder \
.appName(app_name) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.getOrCreate() # Criando spark context
spark_streaming_context = StreamingContext(spark_session.sparkContext,
spark_batch_interval) # Criando streaming Context
kinesis_streams = KinesisUtils.createStream(
spark_streaming_context, app_name, kinesis_stream_name, kinesis_endpoint,
aws_region, kinesis_initial_position,
kinesis_checkpoint_interval) # Inicializando o streaming com kinesis utils
kinesis_streams.foreachRDD(process) # Processando os micro batchs
spark_streaming_context.start()
spark_streaming_context.awaitTermination()
except Exception as ex:
logging.error(ex)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment