Last active
December 15, 2015 05:19
-
-
Save krishnanraman/5207976 to your computer and use it in GitHub Desktop.
Test Pail using Ints - given numbers 1 to 100, create a nested directory structure, where numbers below 50 go into one tree & those above into another. Further, in each tree, we create subdirectories based on number mod 7. So you should see 2+14 = 16 directories after you run this piece of code. They should partition the input space {1..100} exa…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.backtype.hadoop.pail.PailStructure | |
import java.util.{ List => JList } | |
import scala.collection.JavaConverters._ | |
import com.twitter.scalding._ | |
import com.twitter.scalding.commons.source.{PailSource,CodecPailStructure} | |
import com.twitter.bijection.{NumericInjections, Injection} | |
class PailTest2(args : Args) extends Job(args) { | |
val pipe = IterableSource((1 to 100), "src").read | |
// test with my pail structure - WORKS, yay !!! | |
//val sink = PailSource[Int]("pailtest", new IntStructure ) | |
//pipe.write(sink) | |
// now test with a target func | |
val func = ((obj:Int) => if( obj < 50) List("./belowfifty" + (obj % 7)) else List("./abovefifty" + (obj % 7))) | |
val validator = ((x:List[String])=>true) | |
val mytype = classOf[Int] | |
val myinjection = new NumericInjections{}.int2BigEndian | |
val cps = new MyCodecPailStructure[Int] | |
cps.setParams(func, validator, mytype, myinjection) | |
val sink2 = PailSource[Int]( "pailtest", cps) | |
pipe.write(sink2) | |
} | |
class MyCodecPailStructure[T] extends PailStructure[T] { | |
var targetFn: T => List[String] = ((x:T)=> List("test")) | |
var validator :List[String] => Boolean = ((x:List[String])=> true) | |
var mytype: java.lang.Class[T] = null | |
var injection: Injection[T, Array[Byte]] = null | |
def setParams( targetFn: T => List[String], | |
validator: List[String] => Boolean, | |
mytype:java.lang.Class[T], | |
injection: Injection[T, Array[Byte]]) = { | |
this.targetFn = targetFn | |
this.validator = validator | |
this.mytype = mytype | |
this.injection = injection | |
} | |
override def isValidTarget(paths: String*): Boolean = validator(paths.toList) | |
override def getTarget(obj: T): JList[String] = targetFn(obj).toList.asJava | |
override def serialize(obj: T): Array[Byte] = injection.apply(obj) | |
override def deserialize(bytes: Array[Byte]): T = injection.invert(bytes).get | |
override val getType = mytype | |
} | |
class IntStructure extends PailStructure[Int] { | |
import java.nio.ByteBuffer | |
val func = ((obj:Int) => if( obj < 50) List("./belowfifty" + (obj % 7)) else List("./abovefifty" + (obj % 7))) | |
override def isValidTarget(paths: String*): Boolean = true | |
override def getTarget(obj: Int): JList[String] = func(obj).asJava | |
override def serialize(obj: Int): Array[Byte] = { | |
val bb = ByteBuffer.allocate(4) | |
bb.putInt(obj) | |
bb.array() | |
} | |
override def deserialize(bytes: Array[Byte]):Int = { | |
val bb = ByteBuffer.wrap( bytes ) | |
bb.getInt(0) | |
} | |
override val getType = classOf[Int] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment