Skip to content

Instantly share code, notes, and snippets.

@MattiaCostamagna
Created April 3, 2023 14:16
Show Gist options
  • Save MattiaCostamagna/c65129d4de6680acbdfd24aa063ae919 to your computer and use it in GitHub Desktop.
Save MattiaCostamagna/c65129d4de6680acbdfd24aa063ae919 to your computer and use it in GitHub Desktop.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.gluetypes import Field
from awsglue import DynamicFrame
from pyspark.context import SparkContext
from pyspark.sql.functions import from_json, col, lit, current_timestamp
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, FloatType, DecimalType, \
DateType, TimestampType, BooleanType, LongType
from pyspark.sql import DataFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME", "kinesis_stream_arn"])
job_run_id = args['JOB_RUN_ID']
sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = Job(glue_context)
def process_batch(data_frame: DataFrame, _batch_id: int):
job.init(args["JOB_NAME"], args)
if data_frame.count() <= 0:
job.commit()
return
data_frame.printSchema()
data_frame.show()
job.commit()
amazon_kinesis_dataframe = glue_context.create_data_frame.from_options(
connection_type="kinesis",
connection_options={
"typeOfData": "kinesis",
"streamARN": args["kinesis_stream_arn"],
"classification": "json",
"startingPosition": "TRIM_HORIZON",
"inferSchema": "false",
"avoidEmptyBatches": "true",
"schema": "`data` string, `metadata` STRUCT<`timestamp`: TIMESTAMP, `record-type`: STRING, `operation`: STRING, `partition-key-type`: STRING, `schema-name`: STRING, `table-name`: STRING, `transaction-id`: BIGINT> NOT NULL"
},
transformation_ctx="amazon_kinesis_dataframe",
)
glue_context.forEachBatch(
frame=amazon_kinesis_dataframe,
batch_function=process_batch,
options={
"windowSize": "100 seconds",
"checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
},
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment