Skip to content

Instantly share code, notes, and snippets.

'''
spark/bin/spark-submit \
--master local --driver-memory 4g \
--num-executors 2 --executor-memory 4g \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
sstreaming-spark-out.py
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *
'''
spark/bin/spark-submit \
--master local --driver-memory 4g \
--num-executors 2 --executor-memory 4g \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
sstreaming-spark-final.py
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import expr
from scipy.stats import skewnorm
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def generate_cycle_randomly(unit_id, cycle, model_variant, label):
temp = 50+skewnorm.rvs(-8, size=1).item() + np.random.normal(0, 5)
pressure = np.random.uniform(900,1200) + np.random.normal(0, 50)
return (unit_id, cycle, model_variant, round(temp, 2), round(pressure, 2), label)
@oskarryn
oskarryn / spark_sstreaming_util.py
Created March 4, 2019 21:01
Util for Spark Structured Streaming. Extract data from kafka message's value and specify schema
def parse_data_from_kafka_message(sdf, schema):
"""Extract data from kafka message's value and specify schema
Parameters
----------
sdf : pyspark.sql.DataFrame
DataFrame obtained from Kafka stream, for which df.isStreaming is True
schema : OrderedDict or pyspark.sql.types.StructType
Dictionary that preserves order, containing columns' names as keys and data types as values.
Alternatively, pyspark.sql.types.StructType could define schema.