Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
SchemaRDD to row JSON conversion
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gdg.dpp.spark
import java.io.StringWriter
import com.fasterxml.jackson.core.JsonFactory
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
object JsonSchemaRDD {
implicit class AsJson(srdd: SchemaRDD) extends Serializable {
private def rowToJson(jf: JsonFactory, rowType: DataType)(row: Row): String = {
val sw = new StringWriter()
val g = jf.createGenerator(sw)
def go: (DataType, Any) => Unit = {
case (_, null) | (NullType, _) => g.writeNull()
case (StringType, value: String) => g.writeString(value)
case (BinaryType, value: Array[Byte]) => g.writeBinary(value)
case (ByteType, value: Byte) => g.writeNumber(value.toInt)
case (ShortType, value: Short) => g.writeNumber(value)
case (IntegerType, value: Int) => g.writeNumber(value)
case (LongType, value: Long) => g.writeNumber(value)
case (DecimalType(), value: java.math.BigDecimal) => g.writeNumber(value)
case (TimestampType, value: java.sql.Timestamp) => g.writeString(value.toString)
case (FloatType, value: Float) => g.writeNumber(value)
case (DoubleType, value: Double) => g.writeNumber(value)
case (BooleanType, value: Boolean) => g.writeBoolean(value)
case (ArrayType(ty, _), values: Seq[_]) =>
g.writeStartArray()
values.foreach(go(ty, _))
g.writeEndArray()
case (StructType(ty), values: Seq[_]) =>
g.writeStartObject()
ty.zip(values).foreach {
case (_, null) =>
case (field, value) =>
g.writeFieldName(field.name)
go(field.dataType, value)
}
g.writeEndObject()
}
go(rowType, row)
g.close()
sw.toString
}
def asJson: RDD[String] = {
val schema = srdd.schema
srdd.mapPartitions { iter =>
val jf = new JsonFactory()
iter.map(rowToJson(jf, schema))
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.