Skip to content

Instantly share code, notes, and snippets.

@kmoulart
Created March 18, 2014 08:52
Show Gist options
  • Save kmoulart/9616125 to your computer and use it in GitHub Desktop.
Save kmoulart/9616125 to your computer and use it in GitHub Desktop.
Making a sequence file from a csv, sequentially or via MapReduce
package myCompany.bigdata.hadoop_project.args;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
/**
* Represents an help command.
*
*/
@Parameters(commandDescription = "Displays the help about a specific command or about the software.")
public class HelpCommand implements UserCommand {
/**
* The command for which the help is requested. If this argument is not specified, a generic help should be displayed.
*/
@Parameter(names = "-command", description = "The command for which the help is requested.")
private String commandName = null;
/**
* Returns the command name.
*
* @return The command name.
*/
public String getCommandName() {
return commandName;
}
/**
* Processes an help command. An helpful message will be displayed to the user.
*/
public void process(final JCommander jCommander) {
if (commandName == null) {
jCommander.usage();
} else {
jCommander.usage(commandName);
}
}
}
package myCompany.bigdata.hadoop_project;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import myCompany.bigdata.hadoop_project.args.HelpCommand;
import myCompany.bigdata.hadoop_project.args.UserCommand;
import myCompany.bigdata.hadoop_project.args.makeSequenceCommand.MakeSequenceFileCommand;
import myCompany.bigdata.hadoop_project.args.makeSequenceCommand.MakeSequenceFileCommandMR;
import org.apache.mahout.classifier.df.data.DescriptorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
public class Main {
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
/**
* Reads the arguments, then executes the user request.
*
* @param args
* The arguments.
*/
public static void main(final String args[]) {
final JCommander jCommander = new JCommander();
final Map<String, UserCommand> commands = new HashMap<String, UserCommand>();
initCommands(jCommander, commands);
try {
// First, we parse the arguments
jCommander.parse(args);
final String commandName = jCommander.getParsedCommand();
// Then, we execute the user request.
if (commandName == null) {
jCommander.usage();
} else {
commands.get(commandName).process(jCommander);
}
} catch (ParameterException e) {
jCommander.usage();
LOGGER.error(e.getMessage());
} catch (IOException e) {
System.err.println("One or multiple file path doesn't exist");
LOGGER.error(e.getMessage());
} catch (InterruptedException e) {
System.err.println("The job got interrupted");
LOGGER.error(e.getMessage());
} catch (ClassNotFoundException e) {
System.err.println("Class not found");
LOGGER.error(e.getMessage());
} catch (DescriptorException e) {
System.err.println("Wrong descriptor format");
LOGGER.error(e.getMessage());
}
}
/**
* Initializes JCommander and a UserCommand map.
*
* The map will contain couples of (command name, command).
*
* @param jCommander
* The JCommander to initialize.
* @param commands
* The map to fill with the existing commands.
*/
private static void initCommands(final JCommander jCommander, final Map<String, UserCommand> commands) {
// The possible commands
final HelpCommand helpCommand = new HelpCommand();
final MakeSequenceFileCommand makeSequenceFileCommand = new MakeSequenceFileCommand();
final MakeSequenceFileCommandMR makeSequenceFileCommandMR = new MakeSequenceFileCommandMR();
// The help command will use the keyword "help"
commands.put("help", helpCommand);
// The MakeSequenceFileCommand with its keyword makesf
commands.put("makesf", makeSequenceFileCommand);
// The MakeSequenceFileCommandMR with its keyword makesfmr
commands.put("makesfmr", makeSequenceFileCommandMR);
for (Entry<String, UserCommand> commandEntry : commands.entrySet()) {
jCommander.addCommand(commandEntry.getKey(), commandEntry.getValue());
}
}
}
package mycompany.bigdata.hadoop_project.args.makeSequenceCommand;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import myCompany.bigdata.hadoop_project.args.UserCommand;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
/**
* Sequentially transforms the given CSV into a Sequence File Text/VectorWritable with
* \"key/key\" as key and the features as vector.
*
*/
@Parameters(commandDescription = "Sequentially transforms the given CSV into a Sequence File Text/VectorWritable with \"key/key\" as key and the features as vector.")
public class MakeSequenceFileCommand implements UserCommand {
private static final Logger LOGGER = LoggerFactory.getLogger(MakeSequenceFileCommand.class);
/**
* The CSV file to tranform (in local file system).
*/
@Parameter(names = {"-i", "--input"}, description = "The CSV file to transform (in local file system).", required = true)
private String input = null;
/**
* The file into which the result shall be written (in HDFS).
*/
@Parameter(names = {"-o", "--output"}, description = "The file into which the result shall be written (in HDFS).", required = true)
private String output = null;
/**
* The separator used to separate the fields.
*/
@Parameter(names = {"-s", "--separator"}, description = "The separator used to separate the fields.", required = false)
private String separator = ",";
/**
* If set, the output file will be overwritten.
*/
@Parameter(names = {"-ow", "--overwrite"}, description = "If set, the output file will be overwritten.", required = false)
private boolean overwrite = false;
@SuppressWarnings("deprecation")
public void process(final JCommander jCommander) throws IOException {
Configuration conf = new Configuration(true);
FileSystem fs = FileSystem.get(conf);
Path filePath = new Path(output);
// Delete previous file if exists
if (fs.exists(filePath)) {
if (overwrite) {
fs.delete(filePath, true);
} else {
LOGGER.error("The ouput file already exists");
return;
}
}
// The input file is not in hdfs
BufferedReader reader = new BufferedReader(new FileReader(input));
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
filePath, Text.class, VectorWritable.class);
// Run through the input file
String line;
while ((line = reader.readLine()) != null) {
// We surround with try catch to get rid of the exception when
// header is included in file
try {
// Split with the given separator
String[] c = line.split(separator);
if (c.length > 1) {
double[] d = new double[c.length];
// Get the feature setl
for (int i = 1; i < c.length; i++)
d[i] = Double.parseDouble(c[i]);
// Put it in a vector
Vector vec = new RandomAccessSparseVector(c.length);
vec.assign(d);
VectorWritable writable = new VectorWritable();
writable.set(vec);
// Create a label with a / and the class label
String label = c[0] + "/" + c[0];
// Write all in the seqfile
writer.append(new Text(label), writable);
}
} catch (NumberFormatException e) {
continue;
}
}
writer.close();
reader.close();
}
}
package myCompany.bigdata.hadoop_project.args.makeSequenceCommand;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import myCompany.bigdata.hadoop_project.args.UserCommand;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
/**
* Transforms, using a MapReduce Job, the given CSV into a Sequence File Text/VectorWritable with
* \"key/key\" as key and the features as vector.
*
*/
@Parameters(commandDescription = "Transforms, using a MapReduce Job, the given CSV into a Sequence File Text/VectorWritable with \"key/key\" as key and the features as vector.")
public class MakeSequenceFileCommandMR implements UserCommand {
private static final Logger LOGGER = LoggerFactory.getLogger(MakeSequenceFileCommandMR.class);
/**
* The CSV file to tranform (in HDFS).
*/
@Parameter(names = {"-i", "--input"}, description = "The CSV file to transform (in HDFS).", required = true)
private String input = null;
/**
* The file into which the result shall be written (in HDFS).
*/
@Parameter(names = {"-o", "--output"}, description = "The file into which the result shall be written (in HDFS).", required = true)
private String output = null;
/**
* The separator used to separate the fields.
*/
@Parameter(names = {"-s", "--separator"}, description = "The separator used to separate the fields.", required = false)
private String separator = ",";
/**
* If set, the output file will be overwritten.
*/
@Parameter(names = {"-ow", "--overwrite"}, description = "If set, the output file will be overwritten.", required = false)
private boolean overwrite = false;
@SuppressWarnings("deprecation")
public void process(final JCommander jCommander) throws IOException, InterruptedException, ClassNotFoundException {
Path inputPath = new Path(input);
Path outputDir = new Path(output);
// Create configuration
Configuration conf = new Configuration(true);
conf.set("separator", separator);
// Create job
Job job = new Job(conf, "ToSequenceFile");
job.setJarByClass(MakeSequenceFileMapper.class);
// Setup MapReduce
job.setMapperClass(MakeSequenceFileMapper.class);
job.setNumReduceTasks(0);
// Specify key / value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VectorWritable.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// Delete output if exists
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(outputDir)) {
if (overwrite) {
hdfs.delete(outputDir, true);
} else {
LOGGER.error("The ouput file already exists");
return;
}
}
// Execute job
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
}
package myCompany.bigdata.hadoop_project.args.makeSequenceCommand;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class MakeSequenceFileMapper extends
Mapper<Object, Text, Text, VectorWritable> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
try {
// Split with the given separator
String[] c = value.toString().split(context.getConfiguration().get("separator", ","));
if (c.length > 1) {
double[] d = new double[c.length];
// Get the feature set
for (int i = 1; i < c.length; i++)
d[i] = Double.parseDouble(c[i]);
// Put it in a vector
Vector vec = new RandomAccessSparseVector(c.length);
vec.assign(d);
VectorWritable writable = new VectorWritable();
writable.set(vec);
// Create a label with a / and the class label
String label = c[0] + "/" + c[0];
// Write all in the seqfile
context.write(new Text(label), writable);
}
} catch (NumberFormatException e) {
}
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>myCompany.bigdata</groupId>
<artifactId>hadoop_project</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hadoop_project</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<!-- Central Repository -->
<id>central</id>
<url>http://repo1.maven.org/maven2/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.0.0-cdh4.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.0.0-cdh4.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.0.0-cdh4.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.0.0-mr1-cdh4.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.0.0-cdh4.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
<version>0.7-cdh4.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>0.7-cdh4.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit-dep</artifactId>
<version>4.8.2</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.30</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>jfreechart</groupId>
<artifactId>jfreechart</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>Hadoop_project</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<downloadSources>true</downloadSources>
<downloadJavadocs>true</downloadJavadocs>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>myCompany.bigdata.hadoop_project.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package myCompany.bigdata.hadoop_project.args;
import java.io.IOException;
import org.apache.mahout.classifier.df.data.DescriptorException;
import com.beust.jcommander.JCommander;
/**
* Processes a valid command.
*
*/
public interface UserCommand {
/**
* Processes the command.
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws DescriptorException
*/
public void process(final JCommander jCommander) throws IOException, InterruptedException, ClassNotFoundException, DescriptorException;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment