Skip to content

Instantly share code, notes, and snippets.

@mneedham
Created November 30, 2014 07:37
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save mneedham/2fa4752749c8aba7f6b3 to your computer and use it in GitHub Desktop.
Save mneedham/2fa4752749c8aba7f6b3 to your computer and use it in GitHub Desktop.
name := "playground"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"
libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3"
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"
ideaExcludeFolders += ".idea"
ideaExcludeFolders += ".idea_modules"
addCommandAlias("make-idea", "update-classifiers; update-sbt-classifiers; gen-idea sbt-classifiers")
addCommandAlias("generate-project",
";update-classifiers;update-sbt-classifiers;gen-idea sbt-classifiers")
import java.io.File
import au.com.bytecode.opencsv.CSVParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
object WriteToCsv {
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}
def main(args: Array[String]) {
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
var crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
(columns(5), 1)
})
})
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
merge(file, destinationFile)
}
// http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAEYYnxYuEaie518ODdn-fR7VvD39d71=CgB_Dxw_4COVXgmYYQ@mail.gmail.com%3E
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
}
@hendisantika
Copy link

Nice example.

But I have an issue with SBT.

Error:Error while importing SBT project:
...

[info] Resolving org.scala-sbt#template-resolver;0.1 ...
[info] Resolving org.scala-tools.sbinary#sbinary_2.10;0.4.2 ...
[info] Resolving org.scala-sbt#api;0.13.13 ...
[info] Resolving org.scala-sbt#incremental-compiler;0.13.13 ...
[info] Resolving org.scala-sbt#apply-macro;0.13.13 ...
[info] Resolving org.spire-math#json4s-support_2.10;0.6.0 ...
[info] Resolving com.thoughtworks.paranamer#paranamer;2.6 ...
[info] Resolving org.scala-sbt#test-agent;0.13.13 ...
[info] Resolving org.scala-sbt#classfile;0.13.13 ...
[info] Resolving org.scala-sbt#completion;0.13.13 ...
[info] Resolving org.scala-sbt#test-interface;1.0 ...
[info] Resolving com.jcraft#jsch;0.1.50 ...
[info] Resolving org.scala-lang#scala-compiler;2.10.6 ...
[info] Resolving org.scala-sbt#interface;0.13.13 ...
[info] Resolving org.scala-sbt#logging;0.13.13 ...
[trace] Stack trace suppressed: run 'last :update' for the full output.
[trace] Stack trace suppressed: run 'last :ssExtractDependencies' for the full output.
[error] (
:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-core_2.11;1.1.0: not found
[error] (
:ssExtractDependencies) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-core_2.11;1.1.0: not found
[error] Total time: 173 s, completed Dec 6, 2016 8:21:50 AM

See complete log in /home/hendisantika/.IdeaIC2016.3/system/log/sbt.last.log

===============================================

Would you give me a solution?

Thanks

@siddharthakille
Copy link

hey, did you get this issue resolved? I am facing a similar issue. let me know if you resolved it. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment