Skip to content

Instantly share code, notes, and snippets.

@ychennay
Created August 5, 2021 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ychennay/e61f7ac05443ebe04181d9af24ac77ab to your computer and use it in GitHub Desktop.
Save ychennay/e61f7ac05443ebe04181d9af24ac77ab to your computer and use it in GitHub Desktop.
Process MicroBatch OOP Example
from abc import ABC, abstractmethod
from pyspark.sql.dataframe import DataFrame as SparkFrame
class Processor(ABC):
@abstractmethod
def process_batch(self, df: SparkFrame, epochID: str)-> None:
raise NotImplementedError
class RealTimeInferenceProcessor(Processor):
def __init__(self):
self.feature_store = initialize_feature_store()
def process_batch(self, df: SparkFrame, epochID: str) -> None:
"""
Concrete implementation of the stream query’s micro batch processing logic.
Args:
df (SparkFrame): The micro-batch Spark DataFrame to process.
epochID (str): An identifier for the batch.
"""
compute_online_features(df, self.feature_store)
forward_micro_batch_to_job_queue(df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment