Skip to content

Instantly share code, notes, and snippets.

@mikebern
Created June 19, 2014 20:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikebern/13aeecff251a256de1e5 to your computer and use it in GitHub Desktop.
Save mikebern/13aeecff251a256de1e5 to your computer and use it in GitHub Desktop.
*** distro/shark-0.9.1/project/SharkBuild.scala 2014-04-09 01:57:26.000000000 -0700
--- shark-0.9.1/project/SharkBuild.scala 2014-06-16 19:16:24.640170412 -0700
***************
*** 201,206 ****
--- 201,207 ----
// See https://code.google.com/p/guava-libraries/issues/detail?id=1095
"com.google.code.findbugs" % "jsr305" % "1.3.+",
+ "co.paralleluniverse" % "quasar-core" % "0.5.0",
// Hive unit test requirements. These are used by Hadoop to run the tests, but not necessary
// in usual Shark runs.
"commons-io" % "commons-io" % "2.1",
*** distro/shark-0.9.1/src/main/scala/shark/execution/HadoopTableReader.scala 2014-04-09 01:57:26.000000000 -0700
--- shark-0.9.1/src/main/scala/shark/execution/HadoopTableReader.scala 2014-06-17 17:46:00.781515266 -0700
***************
*** 41,46 ****
--- 41,48 ----
import shark.{SharkEnv, Utils}
import shark.execution.TableReader.PruningFunctionType
+ import com.twitter.elephantbird.hive.serde.ProtobufDeserializer
+
/**
* Helper class for scanning tables stored in Hadoop - e.g., to read Hive tables that reside in the
* data warehouse directory.
***************
*** 203,209 ****
// If conversion was performed, convertedRow will be a standard Object, but if
// conversion wasn't necessary, it will still be lazy. We can't have both across
// partitions, so we serialize and deserialize again to make it lazy.
! if (tableSerDe.isInstanceOf[OrcSerde]) {
convertedRow
} else {
convertedRow match {
--- 205,211 ----
// If conversion was performed, convertedRow will be a standard Object, but if
// conversion wasn't necessary, it will still be lazy. We can't have both across
// partitions, so we serialize and deserialize again to make it lazy.
! if (tableSerDe.isInstanceOf[OrcSerde] || tableSerDe.isInstanceOf[ProtobufDeserializer[_]]) {
convertedRow
} else {
convertedRow match {
*** distro/shark-0.9.1/src/main/scala/shark/KryoRegistrator.scala 2014-04-09 01:57:26.000000000 -0700
--- shark-0.9.1/src/main/scala/shark/KryoRegistrator.scala 2014-06-16 19:33:18.155428800 -0700
***************
*** 17,23 ****
package shark
! import java.io.{DataInputStream, DataOutputStream}
import java.util.Arrays
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
--- 17,23 ----
package shark
! import java.io.{DataInputStream, DataOutputStream, Externalizable, ObjectInput, ObjectOutput}
import java.util.Arrays
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
***************
*** 28,33 ****
--- 28,34 ----
import org.apache.spark.serializer.{KryoRegistrator => SparkKryoRegistrator}
import shark.execution.serialization.SerializableWritable
+ import co.paralleluniverse.io.serialization.kryo._
class KryoRegistrator extends SparkKryoRegistrator {
def registerClasses(kryo: Kryo) {
***************
*** 41,47 ****
--- 42,62 ----
kryo.register(classOf[MapJoinObjectValue], new KryoJavaSerializer)
kryo.register(classOf[SerializableWritable[_]], new KryoSWSerializer)
+ //
+ //kryo.register(classOf[com.google.protobuf.DescriptorProtos.DescriptorProto], new KryoJavaSerializer)
+ //kryo.register(classOf[com.google.protobuf.Descriptors], new KryoJavaSerializer)
+
+ //kryo.register(classOf[com.google.protobuf.Descriptors.Descriptor], new KryoJavaSerializer)
+ //kryo.register(classOf[com.google.protobuf.Descriptors.FileDescriptor], new KryoJavaSerializer)
+ //kryo.register(classOf[com.google.protobuf.Descriptors.FieldDescriptor], new KryoJavaSerializer)
+
+ //kryo.register(classOf[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector], new KryoJavaSerializer)
+ //kryo.register(classOf[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector.ProtobufStructField], new KryoJavaSerializer)
+ ////kryo.addDefaultSerializer(classOf[Externalizable], new ExternalizableKryoSerializer)
+ kryo.register(classOf[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector], new ExternalizableKryoSerializer[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector])
+ kryo.register(classOf[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector.ProtobufStructField], new ExternalizableKryoSerializer[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector.ProtobufStructField])
+
// As far as I (rxin) know, among all Hadoop writables only TimestampWritable
// cannot be serialized by Kryo out of the box.
kryo.register(classOf[org.apache.hadoop.hive.serde2.io.TimestampWritable],
***************
*** 74,76 ****
--- 89,112 ----
writable
}
}
+
+
+ object ExternalizableKryoSerializer {
+
+ private val ks = new KryoSerializer()
+ }
+
+ class ExternalizableKryoSerializer[T <: Externalizable] extends com.esotericsoftware.kryo.Serializer[T] {
+
+ override def write(kryo: Kryo, output: KryoOutput, obj: T) {
+ obj.writeExternal(KryoUtil.asObjectOutput(output, kryo))
+ }
+
+ override def read(kryo: Kryo, input: KryoInput, `type`: Class[T]): T = {
+ val obj = kryo.newInstance(`type`)
+ obj.readExternal(KryoUtil.asObjectInput(input, kryo))
+ obj
+ }
+
+
+ }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment