Created
June 19, 2014 20:59
-
-
Save mikebern/13aeecff251a256de1e5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*** distro/shark-0.9.1/project/SharkBuild.scala 2014-04-09 01:57:26.000000000 -0700 | |
--- shark-0.9.1/project/SharkBuild.scala 2014-06-16 19:16:24.640170412 -0700 | |
*************** | |
*** 201,206 **** | |
--- 201,207 ---- | |
// See https://code.google.com/p/guava-libraries/issues/detail?id=1095 | |
"com.google.code.findbugs" % "jsr305" % "1.3.+", | |
+ "co.paralleluniverse" % "quasar-core" % "0.5.0", | |
// Hive unit test requirements. These are used by Hadoop to run the tests, but not necessary | |
// in usual Shark runs. | |
"commons-io" % "commons-io" % "2.1", | |
*** distro/shark-0.9.1/src/main/scala/shark/execution/HadoopTableReader.scala 2014-04-09 01:57:26.000000000 -0700 | |
--- shark-0.9.1/src/main/scala/shark/execution/HadoopTableReader.scala 2014-06-17 17:46:00.781515266 -0700 | |
*************** | |
*** 41,46 **** | |
--- 41,48 ---- | |
import shark.{SharkEnv, Utils} | |
import shark.execution.TableReader.PruningFunctionType | |
+ import com.twitter.elephantbird.hive.serde.ProtobufDeserializer | |
+ | |
/** | |
* Helper class for scanning tables stored in Hadoop - e.g., to read Hive tables that reside in the | |
* data warehouse directory. | |
*************** | |
*** 203,209 **** | |
// If conversion was performed, convertedRow will be a standard Object, but if | |
// conversion wasn't necessary, it will still be lazy. We can't have both across | |
// partitions, so we serialize and deserialize again to make it lazy. | |
! if (tableSerDe.isInstanceOf[OrcSerde]) { | |
convertedRow | |
} else { | |
convertedRow match { | |
--- 205,211 ---- | |
// If conversion was performed, convertedRow will be a standard Object, but if | |
// conversion wasn't necessary, it will still be lazy. We can't have both across | |
// partitions, so we serialize and deserialize again to make it lazy. | |
! if (tableSerDe.isInstanceOf[OrcSerde] || tableSerDe.isInstanceOf[ProtobufDeserializer[_]]) { | |
convertedRow | |
} else { | |
convertedRow match { | |
*** distro/shark-0.9.1/src/main/scala/shark/KryoRegistrator.scala 2014-04-09 01:57:26.000000000 -0700 | |
--- shark-0.9.1/src/main/scala/shark/KryoRegistrator.scala 2014-06-16 19:33:18.155428800 -0700 | |
*************** | |
*** 17,23 **** | |
package shark | |
! import java.io.{DataInputStream, DataOutputStream} | |
import java.util.Arrays | |
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} | |
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} | |
--- 17,23 ---- | |
package shark | |
! import java.io.{DataInputStream, DataOutputStream, Externalizable, ObjectInput, ObjectOutput} | |
import java.util.Arrays | |
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} | |
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} | |
*************** | |
*** 28,33 **** | |
--- 28,34 ---- | |
import org.apache.spark.serializer.{KryoRegistrator => SparkKryoRegistrator} | |
import shark.execution.serialization.SerializableWritable | |
+ import co.paralleluniverse.io.serialization.kryo._ | |
class KryoRegistrator extends SparkKryoRegistrator { | |
def registerClasses(kryo: Kryo) { | |
*************** | |
*** 41,47 **** | |
--- 42,62 ---- | |
kryo.register(classOf[MapJoinObjectValue], new KryoJavaSerializer) | |
kryo.register(classOf[SerializableWritable[_]], new KryoSWSerializer) | |
+ // | |
+ //kryo.register(classOf[com.google.protobuf.DescriptorProtos.DescriptorProto], new KryoJavaSerializer) | |
+ //kryo.register(classOf[com.google.protobuf.Descriptors], new KryoJavaSerializer) | |
+ | |
+ //kryo.register(classOf[com.google.protobuf.Descriptors.Descriptor], new KryoJavaSerializer) | |
+ //kryo.register(classOf[com.google.protobuf.Descriptors.FileDescriptor], new KryoJavaSerializer) | |
+ //kryo.register(classOf[com.google.protobuf.Descriptors.FieldDescriptor], new KryoJavaSerializer) | |
+ | |
+ //kryo.register(classOf[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector], new KryoJavaSerializer) | |
+ //kryo.register(classOf[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector.ProtobufStructField], new KryoJavaSerializer) | |
+ ////kryo.addDefaultSerializer(classOf[Externalizable], new ExternalizableKryoSerializer) | |
+ kryo.register(classOf[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector], new ExternalizableKryoSerializer[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector]) | |
+ kryo.register(classOf[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector.ProtobufStructField], new ExternalizableKryoSerializer[com.twitter.elephantbird.hive.serde.ProtobufStructObjectInspector.ProtobufStructField]) | |
+ | |
// As far as I (rxin) know, among all Hadoop writables only TimestampWritable | |
// cannot be serialized by Kryo out of the box. | |
kryo.register(classOf[org.apache.hadoop.hive.serde2.io.TimestampWritable], | |
*************** | |
*** 74,76 **** | |
--- 89,112 ---- | |
writable | |
} | |
} | |
+ | |
+ | |
+ object ExternalizableKryoSerializer { | |
+ | |
+ private val ks = new KryoSerializer() | |
+ } | |
+ | |
+ class ExternalizableKryoSerializer[T <: Externalizable] extends com.esotericsoftware.kryo.Serializer[T] { | |
+ | |
+ override def write(kryo: Kryo, output: KryoOutput, obj: T) { | |
+ obj.writeExternal(KryoUtil.asObjectOutput(output, kryo)) | |
+ } | |
+ | |
+ override def read(kryo: Kryo, input: KryoInput, `type`: Class[T]): T = { | |
+ val obj = kryo.newInstance(`type`) | |
+ obj.readExternal(KryoUtil.asObjectInput(input, kryo)) | |
+ obj | |
+ } | |
+ | |
+ | |
+ } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment