Skip to content

Instantly share code, notes, and snippets.

@udf(returnType=ArrayType(StringType()))
def to_upper_list(s):
return [i.upper() for i in s]
# Case 1 - UDF annotation
to_upper_list(['potato', 'carrot', 'tomato'])
"""
TypeError: Invalid argument, not a string or column: ['potato', 'carrot', 'tomato'] of type <class 'list'>.
For column literals, use 'lit', 'array', 'struct' or 'create_map' function
from unittest import TestCase
import pytest
from pyspark.sql.types import StringType
@pytest.fixture(scope='function', autouse=True)
def mock_udf_annotation(monkeypatch):
def dummy_udf(f):
return f
from unittest import TestCase
def dummy_udf(f):
return f
def mock_udf(f=None, returnType=None):
return f if f else dummy_udf
from mock import patch
patch('pyspark.sql.functions.udf', mock_udf).start()
from importlib import reload
from unittest import TestCase
from mock import patch
from our_package.spark import udfs as UDF
def dummy_udf(f):
return f
import SchemaValidationListener._
import org.everit.json.schema.Schema
import org.everit.json.schema.event.{CombinedSchemaMatchEvent, SchemaReferencedEvent, ValidationListener}
import scala.collection.mutable.ListBuffer
import scala.util.matching.Regex
import scala.collection.JavaConverters._
object SchemaValidationListener {
@afranzi
afranzi / validate.scala
Created March 13, 2019 14:12
JSON Schema Validation
def getSchemaRef(event: JSONObject): String = {
JsonObjectFields.getSchemaRef(event, defaultSchema)
}
def validateEvent(schema: Schema, event: JSONObject): ValidationResult[JSONObject] = {
val validationListener: SchemaValidationListener = SchemaValidationListener()
val validator: Validator = Validator
.builder
.withListener(validationListener)
.build()
@afranzi
afranzi / buildSchema.scala
Created March 13, 2019 14:13
JSON Schema builder with LoadingCache
val schemaCache: LoadingCache[String, Schema] = CacheBuilder
.newBuilder
.maximumSize(MaximumCacheSize)
.expireAfterAccess(CacheMinutes, TimeUnit.MINUTES)
.build(new CacheLoader[String, Schema]() {
override def load(schemaRef: String): Schema = {
logger.info(s"Loading schema $schemaRef")
loadSchema(schemaRef: String)
}
})
@afranzi
afranzi / SchemaValidationListener.scala
Created March 13, 2019 14:15
Schema Validation Listener for JSON Schemas
import SchemaValidationListener._
import org.everit.json.schema.Schema
import org.everit.json.schema.event.{CombinedSchemaMatchEvent, SchemaReferencedEvent, ValidationListener}
import scala.collection.mutable.ListBuffer
import scala.util.matching.Regex
import scala.collection.JavaConverters._
object SchemaValidationListener {
@afranzi
afranzi / device-sensor-event.jslt
Last active March 13, 2019 14:16
JSLT evolution for a device-sensor-event
{
"user": {
"id": .userId
},
"device": {
"id": "undefined",
"platform": if(.p == "a") "Android" else "iPhone"
},
"product": {
"id": "remix",
@afranzi
afranzi / SchemaEvolve.scala
Created March 13, 2019 14:17
Evolve json events using JSLT expressions
def evolve(event: ValidationResult[JsonNode]): EvolutionResult[JsonNode] = {
val schemasReferenced: Seq[SchemaReferenced] = event.schemasReferenced
val json = event.event
val schemasToEvolve = schemasReferenced
.filter { case SchemaReferenced(_, schemaRef) => hasEvolution(schemaRef) }
val eventEvolved = schemasToEvolve
.foldLeft(json) {
case (jsonEvent: JsonNode, SchemaReferenced(location, schemaRef)) =>