Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
import com.twitter.scalding.Job
import com.twitter.scalding.typed.{CoGroupable, Grouped, TypedPipe, UnsortedGrouped}
import scala.reflect.runtime.universe
object ThriftClassFinder {
private val structures: Set[Class[_]] = Set(
* Reflect over a scalding job to try and identify thrift types it uses so they can be tokenized by cascading.
* For some reason scala reflection is broken with the Hadoop InterfaceAudiance annotation (see
*, meaning we can't use scalaType.members, so we instead use java
* reflection to iterate over fields to find the ones we care about, and then look those up in scala reflection to
* find the full un-erased type signatures, and try to find Thrift types from those.
* Note: this certainly not guaranteed to find every used type. It can't find types used in a step that isn't
* referred to in a field, in addition to whatever bugs may exist.
def findUsedThriftClasses(jobClazz: Class[_ <: Job]): Seq[Class[_]] = {
val mirror = universe.runtimeMirror(jobClazz.getClassLoader)
val scalaType = mirror.classSymbol(jobClazz).toType
for {
field <- jobClazz.getDeclaredFields
if structures.contains(field.getType)
scalaSignature = scalaType.member(universe.TermName(field.getName)).typeSignature
clazz <- getThriftClassesForType(scalaSignature)
} yield {
private def getThriftClassesForType(typeSignature: universe.Type): Seq[Class[_]] = {
typeSignature.resultType.typeArgs.flatMap({ generic =>
//If the wrapped type is a Tuple2, recurse into its types
if (generic.typeSymbol.fullName == "scala.Tuple2") {
} else {
private def getThriftClassOpt(name: String): Option[Class[_]] = {
try {
val clazz: Class[_] = Class.forName(name)
if (classOf[org.apache.thrift.TBase[_,_]].isAssignableFrom(clazz))
else None
} catch {
// Log something?
case _: ClassNotFoundException => None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment