Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Last active December 15, 2015 05:19
Show Gist options
  • Save krishnanraman/5207976 to your computer and use it in GitHub Desktop.
Save krishnanraman/5207976 to your computer and use it in GitHub Desktop.
Test Pail using Ints - given numbers 1 to 100, create a nested directory structure, where numbers below 50 go into one tree & those above into another. Further, in each tree, we create subdirectories based on number mod 7. So you should see 2+14 = 16 directories after you run this piece of code. They should partition the input space {1..100} exa…
import com.backtype.hadoop.pail.PailStructure
import java.util.{ List => JList }
import scala.collection.JavaConverters._
import com.twitter.scalding._
import com.twitter.scalding.commons.source.{PailSource,CodecPailStructure}
import com.twitter.bijection.{NumericInjections, Injection}
class PailTest2(args : Args) extends Job(args) {
val pipe = IterableSource((1 to 100), "src").read
// test with my pail structure - WORKS, yay !!!
//val sink = PailSource[Int]("pailtest", new IntStructure )
//pipe.write(sink)
// now test with a target func
val func = ((obj:Int) => if( obj < 50) List("./belowfifty" + (obj % 7)) else List("./abovefifty" + (obj % 7)))
val validator = ((x:List[String])=>true)
val mytype = classOf[Int]
val myinjection = new NumericInjections{}.int2BigEndian
val cps = new MyCodecPailStructure[Int]
cps.setParams(func, validator, mytype, myinjection)
val sink2 = PailSource[Int]( "pailtest", cps)
pipe.write(sink2)
}
class MyCodecPailStructure[T] extends PailStructure[T] {
var targetFn: T => List[String] = ((x:T)=> List("test"))
var validator :List[String] => Boolean = ((x:List[String])=> true)
var mytype: java.lang.Class[T] = null
var injection: Injection[T, Array[Byte]] = null
def setParams( targetFn: T => List[String],
validator: List[String] => Boolean,
mytype:java.lang.Class[T],
injection: Injection[T, Array[Byte]]) = {
this.targetFn = targetFn
this.validator = validator
this.mytype = mytype
this.injection = injection
}
override def isValidTarget(paths: String*): Boolean = validator(paths.toList)
override def getTarget(obj: T): JList[String] = targetFn(obj).toList.asJava
override def serialize(obj: T): Array[Byte] = injection.apply(obj)
override def deserialize(bytes: Array[Byte]): T = injection.invert(bytes).get
override val getType = mytype
}
class IntStructure extends PailStructure[Int] {
import java.nio.ByteBuffer
val func = ((obj:Int) => if( obj < 50) List("./belowfifty" + (obj % 7)) else List("./abovefifty" + (obj % 7)))
override def isValidTarget(paths: String*): Boolean = true
override def getTarget(obj: Int): JList[String] = func(obj).asJava
override def serialize(obj: Int): Array[Byte] = {
val bb = ByteBuffer.allocate(4)
bb.putInt(obj)
bb.array()
}
override def deserialize(bytes: Array[Byte]):Int = {
val bb = ByteBuffer.wrap( bytes )
bb.getInt(0)
}
override val getType = classOf[Int]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment