Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.SparkContext
import org.apache.log4j.{Level, Logger}
// A sparkSession é provida pelo proprio Spark Shell
// O nivel de log também já é configurado pela Spark Shell
def boolean_udf_wrapper(a:String, b:String, t:Any): Boolean = { true }
def string_udf_wrapper(a:String, b:String, t:Any): String = { "••••" }
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.catalyst.dsl.plans.table
import org.apache.spark.sql.catalyst.dsl.expressions.{sum,max,min,first,last,count,avg}
//
// O código acima é constante. A parte mutável aparece abaixo.
//
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, LongType, StructType}
class MyCountUDAF extends UserDefinedAggregateFunction {
// UserDefinedAggregateFunction is the contract to define
// user-defined aggregate functions (UDAFs).
// Este método abaixo define pode ser invocado apenas assim: inputSchema(0)
// Isto é feito via inversão de dependência pelo Spark
// o retorno é um objeto StructField assim:
// StructField("id", LongType, true, {})
// o objeto StructField é do pacote org.apache.spark.sql.types
override def inputSchema: StructType = {
new StructType().add("id", LongType, nullable = true)
}
// O buffer para resultado temporário possui um único atributo
// no caso da funcionalidade de contagem.
// Este método abaixo define pode ser invocado apenas assim: bufferSchema(0)
// Isto é feito via inversão de dependência pelo Spark
// o retorno é um objeto StructField assim:
// StructField("count", LongType, true, {})
override def bufferSchema: StructType = {
new StructType().add("count", LongType, nullable = true)
}
// O método abaixo deve ser invocado sem parênteses em Scala.
// refere-se ao tipo do atributo de saida
override def dataType: DataType = LongType
override def deterministic: Boolean = true
// O método abaixo inicializa o buffer.
// Isto é feito via inversão de dependência pelo Spark
// Observe que a única coisa a ser feita é inicializar o contador com Zero.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
println(s">>> initialize (buffer: $buffer)")
// NOTE: Scala's update used under the covers
buffer(0) = 0L
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
println(s">>> update (buffer: $buffer -> input: $input)")
buffer(0) = buffer.getLong(0) + 1
}
override def merge(buffer: MutableAggregationBuffer, row: Row): Unit = {
println(s">>> merge (buffer: $buffer -> row: $row)")
buffer(0) = buffer.getLong(0) + row.getLong(0)
}
override def evaluate(buffer: Row): Any = {
println(s">>> evaluate (buffer: $buffer)")
buffer.getLong(0)
}
}
// Criando o objeto MyCountUDAF para ser usada com a API de Dataset/DataFrame
val myCountUDAF = new MyCountUDAF
//
case class R7_Tuple(deptId:Long,deptName:String){}
val R7_Dataset = spark.read.json("DATA/depto.json").as[R7_Tuple]
case class R6_Tuple(deptId: Long, deptName: String){}
val R6_Dataset = R7_Dataset.filter(t => boolean_udf_wrapper("scala", "oldestDeptos", t)).as[R6_Tuple]
case class R8_Tuple(deptId:Long){}
val R8_Dataset = spark.read.json("DATA/depto_ids.json").as[R8_Tuple]
case class R2_Tuple( deptId:Long, deptName:String){}
val R2_Dataset = R8_Dataset.join(R6_Dataset, "deptId").as[R2_Tuple]
case class R0_Tuple(deptId:Long,name:String,salary:Double){}
val R0_Dataset = spark.read.json("DATA/employees.json").as[R0_Tuple]
case class R1_Tuple( deptId: Long, name: String, salary: Double, nameSmartCased: String ){}
val R1_Dataset = R0_Dataset.map(t => R1_Tuple(t.deptId, t.name, t.salary, string_udf_wrapper("scala", "smartTextCase", t) ))
case class R3_Tuple( deptId:Long, name:String, salary:Double, nameSmartCased:String, deptName: String){}
val R3_Dataset = R1_Dataset.join(R2_Dataset, "deptId").as[R3_Tuple]
case class R4_Tuple(deptId: Long, name: String, salary: Double, nameSmartCased: String, deptName: String){}
val R4_Dataset = R3_Dataset.filter(t => boolean_udf_wrapper("scala", "happyEmployees", t)).as[R4_Tuple]
case class R5_Tuple( deptId: Long, sum_salary: Double ){}
val R5_Dataset = R4_Dataset.groupBy("deptId").
agg(expr("sum(salary)").alias("sum_salary")).as[R5_Tuple]
//
val agregated_1 = R4_Dataset.groupBy('deptId).agg(myCountUDAF('name) as "count")
agregated_1.show(10)
val agregated_2 = R4_Dataset.groupBy('deptId).agg(myCountUDAF.distinct('salary) as "count")
agregated_2.show(10)
R5_Dataset.show(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment