Skip to content

Instantly share code, notes, and snippets.

@afranzi
Created May 30, 2019 15:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afranzi/b914a357b8c90d43581e0d4eb24be088 to your computer and use it in GitHub Desktop.
Save afranzi/b914a357b8c90d43581e0d4eb24be088 to your computer and use it in GitHub Desktop.
object Filters {
def filter(filters: Seq[Column])(df: DataFrame): DataFrame = {
filters.foldLeft(df)((df, filter) => df.filter(filter))
}
}
trait SparkReader {
protected def execute(reader: DataFrameReader): DataFrame
def read(schema: Option[StructType] = None, filters: Seq[Column] = Seq.empty)(implicit sparkSession: SparkSession): DataFrame = {
val reader = sparkSession.read
val dataFrameReader = schema.fold(reader)(reader.schema)
execute(dataFrameReader)
.transform(Filters.filter(filters))
}
}
case class SparkMongoReader(mongoUri: String) extends SparkReader {
protected def execute(reader: DataFrameReader): DataFrame = {
reader
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", mongoUri)
.load()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment