Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
name := "playground"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"
libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3"
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"
ideaExcludeFolders += ".idea"
ideaExcludeFolders += ".idea_modules"
addCommandAlias("make-idea", "update-classifiers; update-sbt-classifiers; gen-idea sbt-classifiers")
addCommandAlias("generate-project",
";update-classifiers;update-sbt-classifiers;gen-idea sbt-classifiers")
import java.io.File
import au.com.bytecode.opencsv.CSVParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
object WriteToCsv {
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}
def main(args: Array[String]) {
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
var crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
(columns(5), 1)
})
})
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
merge(file, destinationFile)
}
// http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAEYYnxYuEaie518ODdn-fR7VvD39d71=CgB_Dxw_4COVXgmYYQ@mail.gmail.com%3E
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
}
@hendisantika

This comment has been minimized.

Copy link

hendisantika commented Dec 6, 2016

Nice example.

But I have an issue with SBT.

Error:Error while importing SBT project:
...

[info] Resolving org.scala-sbt#template-resolver;0.1 ...
[info] Resolving org.scala-tools.sbinary#sbinary_2.10;0.4.2 ...
[info] Resolving org.scala-sbt#api;0.13.13 ...
[info] Resolving org.scala-sbt#incremental-compiler;0.13.13 ...
[info] Resolving org.scala-sbt#apply-macro;0.13.13 ...
[info] Resolving org.spire-math#json4s-support_2.10;0.6.0 ...
[info] Resolving com.thoughtworks.paranamer#paranamer;2.6 ...
[info] Resolving org.scala-sbt#test-agent;0.13.13 ...
[info] Resolving org.scala-sbt#classfile;0.13.13 ...
[info] Resolving org.scala-sbt#completion;0.13.13 ...
[info] Resolving org.scala-sbt#test-interface;1.0 ...
[info] Resolving com.jcraft#jsch;0.1.50 ...
[info] Resolving org.scala-lang#scala-compiler;2.10.6 ...
[info] Resolving org.scala-sbt#interface;0.13.13 ...
[info] Resolving org.scala-sbt#logging;0.13.13 ...
[trace] Stack trace suppressed: run 'last :update' for the full output.
[trace] Stack trace suppressed: run 'last :ssExtractDependencies' for the full output.
[error] (
:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-core_2.11;1.1.0: not found
[error] (
:ssExtractDependencies) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-core_2.11;1.1.0: not found
[error] Total time: 173 s, completed Dec 6, 2016 8:21:50 AM

See complete log in /home/hendisantika/.IdeaIC2016.3/system/log/sbt.last.log

===============================================

Would you give me a solution?

Thanks

@Siddhukille

This comment has been minimized.

Copy link

Siddhukille commented Jun 4, 2018

hey, did you get this issue resolved? I am facing a similar issue. let me know if you resolved it. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.