Skip to content

Instantly share code, notes, and snippets.

@tristanreid
Last active October 2, 2015 05:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tristanreid/6b10f191b94efd60a069 to your computer and use it in GitHub Desktop.
Save tristanreid/6b10f191b94efd60a069 to your computer and use it in GitHub Desktop.
Scalding ExecutionApp for KMeans example
/*
To execute https://github.com/twitter/scalding/blob/master/scalding-core/src/main/scala/com/twitter/scalding/examples/KMeans.scala
and wrap results in a file.
Execute locally like this:
scala -classpath target/project-0.0.1-jar-with-dependencies.jar com.mycompany.project.KMeans2Caller \
--local \
--clusters <num clusters> \
--input ../work/kmeansData.tsv \
--output kout.tsv \
--output2 kout2.tsv
Or on hadoop like this (copy data w/ `hadoop fs -copyFromLocal kmeansData.tsv /`)
hadoop jar target/project-0.0.1-jar-with-dependencies.jar com.mycompany.project.KMeans2Caller \
--hdfs \
--clusters 3 \
--input hdfs:///kmeansData.tsv \
--output hdfs:///kout.tsv \
--output2 hdfs:///kout2.tsv
Python script to generate some data:
import numpy as np
def data(meanXY, n):
return np.random.multivariate_normal(meanXY, [(1, 0),(0, 1)], n)
data1 = data([0, 0], 100)
data2 = data([5, 0], 100)
data3 = data([0, 5], 100)
def format(a):
return "\n".join(["\t".join([str(col) for col in row]) for row in a])
# You can also write the name of the parent data cluster, if you want to keep track of that
with open('kmeansData.tsv', 'w') as f:
f.write(format(data1)
f.write(format(data2)
*/
import com.twitter.scalding._
import com.twitter.scalding.examples.KMeans.LabeledVector
object KMeans2Caller extends ExecutionApp {
override def job = Execution.getArgs.flatMap { case args =>
val numClusters = args("clusters").toInt
val inputFile = args("input")
val outputFile = args("output")
val outputFile2 = args("output2")
val initialPoints: TypedPipe[Vector[Double]] =
TypedPipe.from(TypedTsv[(Double, Double)](inputFile))
.map { case (x, y) => Vector[Double](x, y) }
KMeans2(numClusters, initialPoints)
.flatMap { result =>
val (stat: Int, vals: ValuePipe[List[LabeledVector]], res: TypedPipe[LabeledVector]) = result
println(s"************ SUCCESS, STAT: $stat ***************")
val e1 = vals.writeExecution(TypedTsv(outputFile))
val e2 = res.writeExecution(TypedTsv(outputFile2))
e1.zip(e2)
}.unit
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment