Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
name := "playground"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"
libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3"
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"
ideaExcludeFolders += ".idea"
ideaExcludeFolders += ".idea_modules"
addCommandAlias("make-idea", "update-classifiers; update-sbt-classifiers; gen-idea sbt-classifiers")
addCommandAlias("generate-project",
";update-classifiers;update-sbt-classifiers;gen-idea sbt-classifiers")
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
public class MyFileUtil {
public static boolean copyMergeWithHeader(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String header) throws IOException {
dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
if(!srcFS.getFileStatus(srcDir).isDir()) {
return false;
} else {
FSDataOutputStream out = dstFS.create(dstFile);
if(header != null) {
out.write((header + "\n").getBytes("UTF-8"));
}
try {
FileStatus[] contents = srcFS.listStatus(srcDir);
for(int i = 0; i < contents.length; ++i) {
if(!contents[i].isDir()) {
FSDataInputStream in = srcFS.open(contents[i].getPath());
try {
IOUtils.copyBytes(in, out, conf, false);
} finally {
in.close();
}
}
}
} finally {
out.close();
}
return deleteSource?srcFS.delete(srcDir, true):true;
}
}
private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException {
if(dstFS.exists(dst)) {
FileStatus sdst = dstFS.getFileStatus(dst);
if(sdst.isDir()) {
if(null == srcName) {
throw new IOException("Target " + dst + " is a directory");
}
return checkDest((String)null, dstFS, new Path(dst, srcName), overwrite);
}
if(!overwrite) {
throw new IOException("Target " + dst + " already exists");
}
}
return dst;
}
}
import java.io.File
import au.com.bytecode.opencsv.CSVParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WriteToCsvWithHeader {
def merge(srcPath: String, dstPath: String, header:String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, header)
}
def main(args: Array[String]) {
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
var crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
(columns(5), 1)
})
})
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
merge(file, destinationFile, "type,count")
}
// http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAEYYnxYuEaie518ODdn-fR7VvD39d71=CgB_Dxw_4COVXgmYYQ@mail.gmail.com%3E
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment