Process MicroBatch OOP Example
from abc import ABC, abstractmethod
from pyspark.sql.dataframe import DataFrame as SparkFrame
class Processor(ABC):
def process_batch(self, df: SparkFrame, epochID: str)-> None:
raise NotImplementedError
class RealTimeInferenceProcessor(Processor):
def __init__(self):
self.feature_store = initialize_feature_store()
def process_batch(self, df: SparkFrame, epochID: str) -> None:
Concrete implementation of the stream query’s micro batch processing logic.
df (SparkFrame): The micro-batch Spark DataFrame to process.
epochID (str): An identifier for the batch.
compute_online_features(df, self.feature_store)
