Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arnobroekhof/8370114 to your computer and use it in GitHub Desktop.
Save arnobroekhof/8370114 to your computer and use it in GitHub Desktop.
package com.srini.tikacustom;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class TikaFileInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new TikaRecordReader();
}
}
package com.srini.tikacustom;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TikaMapreduce extends Configured implements Tool {
public static class TikaMapper extends Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
public static void main(String[] args) throws Exception {
int exit = ToolRunner.run(new Configuration(), new TikaMapreduce(),
args);
System.exit(exit);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
if (args.length != 2) {
System.out.println("set the input path and output path");
return 2;
}
Configuration conf = new Configuration();
Job job = new Job(conf, "TikaMapreduce");
job.setJarByClass(getClass());
job.setJobName("TikRead");
job.setInputFormatClass(TikaFileInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(TikaMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TikaOutPutFormt.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]
+ System.currentTimeMillis()));
return job.waitForCompletion(true) ? 0 : 1;
}
}
package com.srini.tikacustom;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TikaOutPutFormt extends FileOutputFormat<Text, Text> {
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Path path=FileOutputFormat.getOutputPath(context);
Path fullapth=new Path(path,"Srini.txt");
FileSystem fs=path.getFileSystem(context.getConfiguration());
FSDataOutputStream output=fs.create(fullapth,context);
return new TikaRecordWrite(output);
}
}
package com.srini.tikacustom;
import java.io.IOException;
import java.net.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.tika.Tika;
import org.apache.tika.exception.TikaException;
public class TikaRecordReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? 1.0f : 0.0f;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
if (!processed) {
Path path = fileSplit.getPath();
key.set(path.toString());
@SuppressWarnings("unused")
FileSystem fs = path.getFileSystem(conf);
@SuppressWarnings("unused")
FSDataInputStream fin = null;
try {
String con = new Tika().parseToString(new URL(path.toString()));
String string = con.replaceAll("[$%&+,:;=?#|']", " ");
String string2 = string.replaceAll("\\s+", " ");
String lo = string2.toLowerCase();
value.set(lo);
} catch (TikaException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
processed = true;
return true;
} else {
return false;
}
}
}
package com.srini.tikacustom;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class TikaRecordWrite extends RecordWriter<Text, Text> {
private DataOutputStream out;
public TikaRecordWrite(DataOutputStream output) {
// TODO Auto-generated constructor stub
out=output;
try {
out.writeBytes("result:\r\n");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
out.close();
}
@Override
public void write(Text key, Text value) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
out.writeBytes(key.toString());
out.writeBytes(",");
out.writeBytes(value.toString());
out.writeBytes("\r\n");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment