Forked from srinivasanHadoop/TikaFileInputFormat.java
Created
January 11, 2014 11:59
-
-
Save arnobroekhof/8370114 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.srini.tikacustom; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
public class TikaFileInputFormat extends FileInputFormat<Text, Text> { | |
@Override | |
public RecordReader<Text, Text> createRecordReader(InputSplit split, | |
TaskAttemptContext context) throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return new TikaRecordReader(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.srini.tikacustom; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class TikaMapreduce extends Configured implements Tool { | |
public static class TikaMapper extends Mapper<Text, Text, Text, Text> { | |
public void map(Text key, Text value, Context context) | |
throws IOException, InterruptedException { | |
context.write(key, value); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
int exit = ToolRunner.run(new Configuration(), new TikaMapreduce(), | |
args); | |
System.exit(exit); | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
// TODO Auto-generated method stub | |
if (args.length != 2) { | |
System.out.println("set the input path and output path"); | |
return 2; | |
} | |
Configuration conf = new Configuration(); | |
Job job = new Job(conf, "TikaMapreduce"); | |
job.setJarByClass(getClass()); | |
job.setJobName("TikRead"); | |
job.setInputFormatClass(TikaFileInputFormat.class); | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
job.setMapperClass(TikaMapper.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(Text.class); | |
job.setOutputFormatClass(TikaOutPutFormt.class); | |
FileOutputFormat.setOutputPath(job, new Path(args[1] | |
+ System.currentTimeMillis())); | |
return job.waitForCompletion(true) ? 0 : 1; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.srini.tikacustom; | |
import java.io.IOException; | |
import org.apache.hadoop.fs.FSDataOutputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
public class TikaOutPutFormt extends FileOutputFormat<Text, Text> { | |
@Override | |
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) | |
throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
Path path=FileOutputFormat.getOutputPath(context); | |
Path fullapth=new Path(path,"Srini.txt"); | |
FileSystem fs=path.getFileSystem(context.getConfiguration()); | |
FSDataOutputStream output=fs.create(fullapth,context); | |
return new TikaRecordWrite(output); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.srini.tikacustom; | |
import java.io.IOException; | |
import java.net.URL; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
import org.apache.tika.Tika; | |
import org.apache.tika.exception.TikaException; | |
public class TikaRecordReader extends RecordReader<Text, Text> { | |
private Text key = new Text(); | |
private Text value = new Text(); | |
private FileSplit fileSplit; | |
private Configuration conf; | |
private boolean processed = false; | |
@Override | |
public void close() throws IOException { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public Text getCurrentKey() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return key; | |
} | |
@Override | |
public Text getCurrentValue() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return value; | |
} | |
@Override | |
public float getProgress() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return processed ? 1.0f : 0.0f; | |
} | |
@Override | |
public void initialize(InputSplit split, TaskAttemptContext context) | |
throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
this.fileSplit = (FileSplit) split; | |
this.conf = context.getConfiguration(); | |
} | |
@Override | |
public boolean nextKeyValue() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
if (!processed) { | |
Path path = fileSplit.getPath(); | |
key.set(path.toString()); | |
@SuppressWarnings("unused") | |
FileSystem fs = path.getFileSystem(conf); | |
@SuppressWarnings("unused") | |
FSDataInputStream fin = null; | |
try { | |
String con = new Tika().parseToString(new URL(path.toString())); | |
String string = con.replaceAll("[$%&+,:;=?#|']", " "); | |
String string2 = string.replaceAll("\\s+", " "); | |
String lo = string2.toLowerCase(); | |
value.set(lo); | |
} catch (TikaException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
processed = true; | |
return true; | |
} else { | |
return false; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.srini.tikacustom; | |
import java.io.DataOutputStream; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
public class TikaRecordWrite extends RecordWriter<Text, Text> { | |
private DataOutputStream out; | |
public TikaRecordWrite(DataOutputStream output) { | |
// TODO Auto-generated constructor stub | |
out=output; | |
try { | |
out.writeBytes("result:\r\n"); | |
} catch (IOException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
@Override | |
public void close(TaskAttemptContext context) throws IOException, | |
InterruptedException { | |
// TODO Auto-generated method stub | |
out.close(); | |
} | |
@Override | |
public void write(Text key, Text value) throws IOException, | |
InterruptedException { | |
// TODO Auto-generated method stub | |
out.writeBytes(key.toString()); | |
out.writeBytes(","); | |
out.writeBytes(value.toString()); | |
out.writeBytes("\r\n"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment