Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Process MicroBatch OOP Example
from abc import ABC, abstractmethod
from pyspark.sql.dataframe import DataFrame as SparkFrame
class Processor(ABC):
@abstractmethod
def process_batch(self, df: SparkFrame, epochID: str)-> None:
raise NotImplementedError
class RealTimeInferenceProcessor(Processor):
def __init__(self):
self.feature_store = initialize_feature_store()
def process_batch(self, df: SparkFrame, epochID: str) -> None:
"""
Concrete implementation of the stream query’s micro batch processing logic.
Args:
df (SparkFrame): The micro-batch Spark DataFrame to process.
epochID (str): An identifier for the batch.
"""
compute_online_features(df, self.feature_store)
forward_micro_batch_to_job_queue(df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment