Skip to content

Instantly share code, notes, and snippets.

@oskarryn
oskarryn / spark_sstreaming_util.py
Created March 4, 2019 21:01
Util for Spark Structured Streaming. Extract data from kafka message's value and specify schema
def parse_data_from_kafka_message(sdf, schema):
"""Extract data from kafka message's value and specify schema
Parameters
----------
sdf : pyspark.sql.DataFrame
DataFrame obtained from Kafka stream, for which df.isStreaming is True
schema : OrderedDict or pyspark.sql.types.StructType
Dictionary that preserves order, containing columns' names as keys and data types as values.
Alternatively, pyspark.sql.types.StructType could define schema.
from scipy.stats import skewnorm
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def generate_cycle_randomly(unit_id, cycle, model_variant, label):
temp = 50+skewnorm.rvs(-8, size=1).item() + np.random.normal(0, 5)
pressure = np.random.uniform(900,1200) + np.random.normal(0, 50)
return (unit_id, cycle, model_variant, round(temp, 2), round(pressure, 2), label)
'''
spark/bin/spark-submit \
--master local --driver-memory 4g \
--num-executors 2 --executor-memory 4g \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
sstreaming-spark-final.py
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import expr
'''
spark/bin/spark-submit \
--master local --driver-memory 4g \
--num-executors 2 --executor-memory 4g \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
sstreaming-spark-out.py
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *