Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Last active December 15, 2015 07:39
Show Gist options
  • Save krishnanraman/5224937 to your computer and use it in GitHub Desktop.
Save krishnanraman/5224937 to your computer and use it in GitHub Desktop.
Pail Example
Pail example:
Writejob: Partition numbers [1..100] into two directories - belowfifty & abovefifty.
Further, create 7 subdirectories under each, based on number mod 7.
So a number like 62 would end up in the location "abovefifty/6".
Readjob: Read the subdirectories "belowfifty/3" & "abovefifty/0"
RESULTS:
$ tree pailtest
pailtest
├── abovefifty
│   ├── 0
│   │   └── part-0000013.pailfile
│   ├── 1
│   │   └── part-000007.pailfile
│   ├── 2
│   │   └── part-000008.pailfile
│   ├── 3
│   │   └── part-000009.pailfile
│   ├── 4
│   │   └── part-0000010.pailfile
│   ├── 5
│   │   └── part-0000011.pailfile
│   └── 6
│   └── part-0000012.pailfile
├── belowfifty
│   ├── 0
│   │   └── part-000006.pailfile
│   ├── 1
│   │   └── part-000000.pailfile
│   ├── 2
│   │   └── part-000001.pailfile
│   ├── 3
│   │   └── part-000002.pailfile
│   ├── 4
│   │   └── part-000003.pailfile
│   ├── 5
│   │   └── part-000004.pailfile
│   └── 6
│   └── part-000005.pailfile
└── pail.meta
$ tree pailoutput
pailoutput
├── part-00000
└── part-00001
0 directories, 2 files
$ cat pailoutput/*
56
63
70
77
84
91
98
3
10
17
24
31
38
45
import com.twitter.scalding._
import com.twitter.scalding.commons.source.PailSource
class PailTest2(args : Args) extends Job(args) {
args("io") match {
case "read" => readjob
case "write" => writejob
}
def readjob = {
val (rootpath, subpath) = ("pailtest", Array(List("abovefifty/0"), List("belowfifty/3")))
val src = PailSource.source[Int]( rootpath, subpath )
src.read.write( new Tsv("pailoutput"))
}
def writejob = {
val rootpath = "pailtest"
val func = ((obj:Int) => if( obj < 50) List("./belowfifty/" + (obj % 7)) else List("./abovefifty/" + (obj % 7)))
val pipe = IterableSource((1 to 100), "src").read
val sink = PailSource.sink[Int]( rootpath, func )
pipe.write(sink)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment