Skip to content

Instantly share code, notes, and snippets.

@prolativ
Created September 22, 2022 11:04
Show Gist options
  • Save prolativ/28958ae923b087fc50041d2ed9714b1c to your computer and use it in GitHub Desktop.
Save prolativ/28958ae923b087fc50041d2ed9714b1c to your computer and use it in GitHub Desktop.
//> using scala "3.2.0"
//> using lib "org.virtuslab::iskra:0.0.2"
import org.virtuslab.iskra.api.*
import functions.*
import org.virtuslab.iskra.UntypedOps.typed // Improvement: We might make this already available after importing iskra.api.*
import org.apache.spark.sql
object IskraExtensions: // One can implementing missing wrappers for spark functions themselves and reuse them later
extension (col: Column[?])
def isNull: Column[BooleanType] = sql.functions.isnull(col.untyped).typed
extension [Schema](df: DataFrame[Schema])
def distinct: DataFrame[Schema] = DataFrame(df.untyped.distinct())
@main def run() =
given spark: SparkSession = SparkSession.builder()
.master("local")
.appName("test")
.getOrCreate()
final case class X(a: Option[String], b: Int, date: String) // Using String for `date` because Date type is not yet supported in Iskra
final case class Y(p: String, q: Int, date: String)
import IskraExtensions.*
def yearOfDate(date: Column[StringType]): Column[IntegerType] =
val parsedDate = sql.functions.date_format(date.untyped, "yyyy-MM-dd")
// TODO: Handle failed parsing and e.g. Return an optional column (Column[IntegerOptType])
sql.functions.year(parsedDate).typed
val x = Seq(
X(Some("abc"), 1, "2022-09-21"),
X(None, 2, "2022-09-22")
).toTypedDF
val y = Seq(
Y("xyz", 0, "2022-09-22")
).toTypedDF
val z = x
.join(y).on($.x.date === $.y.date) // leftJoin is already implemented and will be available in Iskra 0.0.3
.where($.x.a.isNull)
.select(
$.x.a.as("renamed_a"), // $.a.as("renamed_a") would also work, because `a` is unambiguous
$.x.date.as("date"), // `.as("date")` is redundant here because the name is not changed
yearOfDate($.y.date).as("year"),
($.x.b + $.x.b + $.y.q).as("bbq")
)
.distinct
z.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment