Created
March 18, 2014 08:52
-
-
Save kmoulart/9616125 to your computer and use it in GitHub Desktop.
Making a sequence file from a csv, sequentially or via MapReduce
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package myCompany.bigdata.hadoop_project.args; | |
import com.beust.jcommander.JCommander; | |
import com.beust.jcommander.Parameter; | |
import com.beust.jcommander.Parameters; | |
/** | |
* Represents an help command. | |
* | |
*/ | |
@Parameters(commandDescription = "Displays the help about a specific command or about the software.") | |
public class HelpCommand implements UserCommand { | |
/** | |
* The command for which the help is requested. If this argument is not specified, a generic help should be displayed. | |
*/ | |
@Parameter(names = "-command", description = "The command for which the help is requested.") | |
private String commandName = null; | |
/** | |
* Returns the command name. | |
* | |
* @return The command name. | |
*/ | |
public String getCommandName() { | |
return commandName; | |
} | |
/** | |
* Processes an help command. An helpful message will be displayed to the user. | |
*/ | |
public void process(final JCommander jCommander) { | |
if (commandName == null) { | |
jCommander.usage(); | |
} else { | |
jCommander.usage(commandName); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package myCompany.bigdata.hadoop_project; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import myCompany.bigdata.hadoop_project.args.HelpCommand; | |
import myCompany.bigdata.hadoop_project.args.UserCommand; | |
import myCompany.bigdata.hadoop_project.args.makeSequenceCommand.MakeSequenceFileCommand; | |
import myCompany.bigdata.hadoop_project.args.makeSequenceCommand.MakeSequenceFileCommandMR; | |
import org.apache.mahout.classifier.df.data.DescriptorException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.beust.jcommander.JCommander; | |
import com.beust.jcommander.ParameterException; | |
public class Main { | |
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class); | |
/** | |
* Reads the arguments, then executes the user request. | |
* | |
* @param args | |
* The arguments. | |
*/ | |
public static void main(final String args[]) { | |
final JCommander jCommander = new JCommander(); | |
final Map<String, UserCommand> commands = new HashMap<String, UserCommand>(); | |
initCommands(jCommander, commands); | |
try { | |
// First, we parse the arguments | |
jCommander.parse(args); | |
final String commandName = jCommander.getParsedCommand(); | |
// Then, we execute the user request. | |
if (commandName == null) { | |
jCommander.usage(); | |
} else { | |
commands.get(commandName).process(jCommander); | |
} | |
} catch (ParameterException e) { | |
jCommander.usage(); | |
LOGGER.error(e.getMessage()); | |
} catch (IOException e) { | |
System.err.println("One or multiple file path doesn't exist"); | |
LOGGER.error(e.getMessage()); | |
} catch (InterruptedException e) { | |
System.err.println("The job got interrupted"); | |
LOGGER.error(e.getMessage()); | |
} catch (ClassNotFoundException e) { | |
System.err.println("Class not found"); | |
LOGGER.error(e.getMessage()); | |
} catch (DescriptorException e) { | |
System.err.println("Wrong descriptor format"); | |
LOGGER.error(e.getMessage()); | |
} | |
} | |
/** | |
* Initializes JCommander and a UserCommand map. | |
* | |
* The map will contain couples of (command name, command). | |
* | |
* @param jCommander | |
* The JCommander to initialize. | |
* @param commands | |
* The map to fill with the existing commands. | |
*/ | |
private static void initCommands(final JCommander jCommander, final Map<String, UserCommand> commands) { | |
// The possible commands | |
final HelpCommand helpCommand = new HelpCommand(); | |
final MakeSequenceFileCommand makeSequenceFileCommand = new MakeSequenceFileCommand(); | |
final MakeSequenceFileCommandMR makeSequenceFileCommandMR = new MakeSequenceFileCommandMR(); | |
// The help command will use the keyword "help" | |
commands.put("help", helpCommand); | |
// The MakeSequenceFileCommand with its keyword makesf | |
commands.put("makesf", makeSequenceFileCommand); | |
// The MakeSequenceFileCommandMR with its keyword makesfmr | |
commands.put("makesfmr", makeSequenceFileCommandMR); | |
for (Entry<String, UserCommand> commandEntry : commands.entrySet()) { | |
jCommander.addCommand(commandEntry.getKey(), commandEntry.getValue()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mycompany.bigdata.hadoop_project.args.makeSequenceCommand; | |
import java.io.BufferedReader; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.SequenceFile; | |
import org.apache.hadoop.io.Text; | |
import org.apache.mahout.math.RandomAccessSparseVector; | |
import org.apache.mahout.math.Vector; | |
import org.apache.mahout.math.VectorWritable; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import myCompany.bigdata.hadoop_project.args.UserCommand; | |
import com.beust.jcommander.JCommander; | |
import com.beust.jcommander.Parameter; | |
import com.beust.jcommander.Parameters; | |
/** | |
* Sequentially transforms the given CSV into a Sequence File Text/VectorWritable with | |
* \"key/key\" as key and the features as vector. | |
* | |
*/ | |
@Parameters(commandDescription = "Sequentially transforms the given CSV into a Sequence File Text/VectorWritable with \"key/key\" as key and the features as vector.") | |
public class MakeSequenceFileCommand implements UserCommand { | |
private static final Logger LOGGER = LoggerFactory.getLogger(MakeSequenceFileCommand.class); | |
/** | |
* The CSV file to tranform (in local file system). | |
*/ | |
@Parameter(names = {"-i", "--input"}, description = "The CSV file to transform (in local file system).", required = true) | |
private String input = null; | |
/** | |
* The file into which the result shall be written (in HDFS). | |
*/ | |
@Parameter(names = {"-o", "--output"}, description = "The file into which the result shall be written (in HDFS).", required = true) | |
private String output = null; | |
/** | |
* The separator used to separate the fields. | |
*/ | |
@Parameter(names = {"-s", "--separator"}, description = "The separator used to separate the fields.", required = false) | |
private String separator = ","; | |
/** | |
* If set, the output file will be overwritten. | |
*/ | |
@Parameter(names = {"-ow", "--overwrite"}, description = "If set, the output file will be overwritten.", required = false) | |
private boolean overwrite = false; | |
@SuppressWarnings("deprecation") | |
public void process(final JCommander jCommander) throws IOException { | |
Configuration conf = new Configuration(true); | |
FileSystem fs = FileSystem.get(conf); | |
Path filePath = new Path(output); | |
// Delete previous file if exists | |
if (fs.exists(filePath)) { | |
if (overwrite) { | |
fs.delete(filePath, true); | |
} else { | |
LOGGER.error("The ouput file already exists"); | |
return; | |
} | |
} | |
// The input file is not in hdfs | |
BufferedReader reader = new BufferedReader(new FileReader(input)); | |
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, | |
filePath, Text.class, VectorWritable.class); | |
// Run through the input file | |
String line; | |
while ((line = reader.readLine()) != null) { | |
// We surround with try catch to get rid of the exception when | |
// header is included in file | |
try { | |
// Split with the given separator | |
String[] c = line.split(separator); | |
if (c.length > 1) { | |
double[] d = new double[c.length]; | |
// Get the feature setl | |
for (int i = 1; i < c.length; i++) | |
d[i] = Double.parseDouble(c[i]); | |
// Put it in a vector | |
Vector vec = new RandomAccessSparseVector(c.length); | |
vec.assign(d); | |
VectorWritable writable = new VectorWritable(); | |
writable.set(vec); | |
// Create a label with a / and the class label | |
String label = c[0] + "/" + c[0]; | |
// Write all in the seqfile | |
writer.append(new Text(label), writable); | |
} | |
} catch (NumberFormatException e) { | |
continue; | |
} | |
} | |
writer.close(); | |
reader.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package myCompany.bigdata.hadoop_project.args.makeSequenceCommand; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; | |
import org.apache.mahout.math.VectorWritable; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import myCompany.bigdata.hadoop_project.args.UserCommand; | |
import com.beust.jcommander.JCommander; | |
import com.beust.jcommander.Parameter; | |
import com.beust.jcommander.Parameters; | |
/** | |
* Transforms, using a MapReduce Job, the given CSV into a Sequence File Text/VectorWritable with | |
* \"key/key\" as key and the features as vector. | |
* | |
*/ | |
@Parameters(commandDescription = "Transforms, using a MapReduce Job, the given CSV into a Sequence File Text/VectorWritable with \"key/key\" as key and the features as vector.") | |
public class MakeSequenceFileCommandMR implements UserCommand { | |
private static final Logger LOGGER = LoggerFactory.getLogger(MakeSequenceFileCommandMR.class); | |
/** | |
* The CSV file to tranform (in HDFS). | |
*/ | |
@Parameter(names = {"-i", "--input"}, description = "The CSV file to transform (in HDFS).", required = true) | |
private String input = null; | |
/** | |
* The file into which the result shall be written (in HDFS). | |
*/ | |
@Parameter(names = {"-o", "--output"}, description = "The file into which the result shall be written (in HDFS).", required = true) | |
private String output = null; | |
/** | |
* The separator used to separate the fields. | |
*/ | |
@Parameter(names = {"-s", "--separator"}, description = "The separator used to separate the fields.", required = false) | |
private String separator = ","; | |
/** | |
* If set, the output file will be overwritten. | |
*/ | |
@Parameter(names = {"-ow", "--overwrite"}, description = "If set, the output file will be overwritten.", required = false) | |
private boolean overwrite = false; | |
@SuppressWarnings("deprecation") | |
public void process(final JCommander jCommander) throws IOException, InterruptedException, ClassNotFoundException { | |
Path inputPath = new Path(input); | |
Path outputDir = new Path(output); | |
// Create configuration | |
Configuration conf = new Configuration(true); | |
conf.set("separator", separator); | |
// Create job | |
Job job = new Job(conf, "ToSequenceFile"); | |
job.setJarByClass(MakeSequenceFileMapper.class); | |
// Setup MapReduce | |
job.setMapperClass(MakeSequenceFileMapper.class); | |
job.setNumReduceTasks(0); | |
// Specify key / value | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(VectorWritable.class); | |
// Input | |
FileInputFormat.addInputPath(job, inputPath); | |
job.setInputFormatClass(TextInputFormat.class); | |
// Output | |
FileOutputFormat.setOutputPath(job, outputDir); | |
job.setOutputFormatClass(SequenceFileOutputFormat.class); | |
// Delete output if exists | |
FileSystem hdfs = FileSystem.get(conf); | |
if (hdfs.exists(outputDir)) { | |
if (overwrite) { | |
hdfs.delete(outputDir, true); | |
} else { | |
LOGGER.error("The ouput file already exists"); | |
return; | |
} | |
} | |
// Execute job | |
int code = job.waitForCompletion(true) ? 0 : 1; | |
System.exit(code); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package myCompany.bigdata.hadoop_project.args.makeSequenceCommand; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.mahout.math.RandomAccessSparseVector; | |
import org.apache.mahout.math.Vector; | |
import org.apache.mahout.math.VectorWritable; | |
public class MakeSequenceFileMapper extends | |
Mapper<Object, Text, Text, VectorWritable> { | |
public void map(Object key, Text value, Context context) | |
throws IOException, InterruptedException { | |
try { | |
// Split with the given separator | |
String[] c = value.toString().split(context.getConfiguration().get("separator", ",")); | |
if (c.length > 1) { | |
double[] d = new double[c.length]; | |
// Get the feature set | |
for (int i = 1; i < c.length; i++) | |
d[i] = Double.parseDouble(c[i]); | |
// Put it in a vector | |
Vector vec = new RandomAccessSparseVector(c.length); | |
vec.assign(d); | |
VectorWritable writable = new VectorWritable(); | |
writable.set(vec); | |
// Create a label with a / and the class label | |
String label = c[0] + "/" + c[0]; | |
// Write all in the seqfile | |
context.write(new Text(label), writable); | |
} | |
} catch (NumberFormatException e) { | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>myCompany.bigdata</groupId> | |
<artifactId>hadoop_project</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>hadoop_project</name> | |
<url>http://maven.apache.org</url> | |
<properties> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
</properties> | |
<repositories> | |
<repository> | |
<!-- Central Repository --> | |
<id>central</id> | |
<url>http://repo1.maven.org/maven2/</url> | |
<releases> | |
<enabled>true</enabled> | |
</releases> | |
<snapshots> | |
<enabled>true</enabled> | |
</snapshots> | |
</repository> | |
<repository> | |
<id>cloudera</id> | |
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> | |
</repository> | |
</repositories> | |
<dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>jdk.tools</groupId> | |
<artifactId>jdk.tools</artifactId> | |
<version>1.6</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-hdfs</artifactId> | |
<version>2.0.0-cdh4.6.0</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-auth</artifactId> | |
<version>2.0.0-cdh4.6.0</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-common</artifactId> | |
<version>2.0.0-cdh4.6.0</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-core</artifactId> | |
<version>2.0.0-mr1-cdh4.6.0</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-client</artifactId> | |
<version>2.0.0-cdh4.6.0</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.mahout</groupId> | |
<artifactId>mahout-math</artifactId> | |
<version>0.7-cdh4.6.0</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.mahout</groupId> | |
<artifactId>mahout-core</artifactId> | |
<version>0.7-cdh4.6.0</version> | |
</dependency> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit-dep</artifactId> | |
<version>4.8.2</version> | |
</dependency> | |
<dependency> | |
<groupId>com.beust</groupId> | |
<artifactId>jcommander</artifactId> | |
<version>1.30</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-log4j12</artifactId> | |
<version>1.7.5</version> | |
</dependency> | |
</dependencies> | |
</dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>jfreechart</groupId> | |
<artifactId>jfreechart</artifactId> | |
<version>1.0.0</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-log4j12</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>com.beust</groupId> | |
<artifactId>jcommander</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-client</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-hdfs</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-auth</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-common</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-core</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.mahout</groupId> | |
<artifactId>mahout-math</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.mahout</groupId> | |
<artifactId>mahout-core</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
<version>4.10</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<finalName>Hadoop_project</finalName> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>2.1</version> | |
<configuration> | |
<source>1.6</source> | |
<target>1.6</target> | |
<downloadSources>true</downloadSources> | |
<downloadJavadocs>true</downloadJavadocs> | |
</configuration> | |
</plugin> | |
<plugin> | |
<artifactId>maven-assembly-plugin</artifactId> | |
<configuration> | |
<archive> | |
<manifest> | |
<mainClass>myCompany.bigdata.hadoop_project.Main</mainClass> | |
</manifest> | |
</archive> | |
<descriptorRefs> | |
<descriptorRef>jar-with-dependencies</descriptorRef> | |
</descriptorRefs> | |
</configuration> | |
<executions> | |
<execution> | |
<id>make-assembly</id> <!-- this is used for inheritance merges --> | |
<phase>package</phase> <!-- bind to the packaging phase --> | |
<goals> | |
<goal>single</goal> | |
</goals> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package myCompany.bigdata.hadoop_project.args; | |
import java.io.IOException; | |
import org.apache.mahout.classifier.df.data.DescriptorException; | |
import com.beust.jcommander.JCommander; | |
/** | |
* Processes a valid command. | |
* | |
*/ | |
public interface UserCommand { | |
/** | |
* Processes the command. | |
* @throws IOException | |
* @throws ClassNotFoundException | |
* @throws InterruptedException | |
* @throws DescriptorException | |
*/ | |
public void process(final JCommander jCommander) throws IOException, InterruptedException, ClassNotFoundException, DescriptorException; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment