Skip to content

Instantly share code, notes, and snippets.

@srinivasanHadoop
Created October 31, 2013 06:28
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save srinivasanHadoop/7245106 to your computer and use it in GitHub Desktop.
Save srinivasanHadoop/7245106 to your computer and use it in GitHub Desktop.
i integrated the apache tika with Hadoop mapreduce code
package com.srini.tikacustom;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class TikaFileInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new TikaRecordReader();
}
}
package com.srini.tikacustom;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TikaMapreduce extends Configured implements Tool {
public static class TikaMapper extends Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
public static void main(String[] args) throws Exception {
int exit = ToolRunner.run(new Configuration(), new TikaMapreduce(),
args);
System.exit(exit);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
if (args.length != 2) {
System.out.println("set the input path and output path");
return 2;
}
Configuration conf = new Configuration();
Job job = new Job(conf, "TikaMapreduce");
job.setJarByClass(getClass());
job.setJobName("TikRead");
job.setInputFormatClass(TikaFileInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(TikaMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TikaOutPutFormt.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]
+ System.currentTimeMillis()));
return job.waitForCompletion(true) ? 0 : 1;
}
}
package com.srini.tikacustom;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TikaOutPutFormt extends FileOutputFormat<Text, Text> {
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Path path=FileOutputFormat.getOutputPath(context);
Path fullapth=new Path(path,"Srini.txt");
FileSystem fs=path.getFileSystem(context.getConfiguration());
FSDataOutputStream output=fs.create(fullapth,context);
return new TikaRecordWrite(output);
}
}
package com.srini.tikacustom;
import java.io.IOException;
import java.net.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.tika.Tika;
import org.apache.tika.exception.TikaException;
public class TikaRecordReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? 1.0f : 0.0f;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
if (!processed) {
Path path = fileSplit.getPath();
key.set(path.toString());
@SuppressWarnings("unused")
FileSystem fs = path.getFileSystem(conf);
@SuppressWarnings("unused")
FSDataInputStream fin = null;
try {
String con = new Tika().parseToString(new URL(path.toString()));
String string = con.replaceAll("[$%&+,:;=?#|']", " ");
String string2 = string.replaceAll("\\s+", " ");
String lo = string2.toLowerCase();
value.set(lo);
} catch (TikaException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
processed = true;
return true;
} else {
return false;
}
}
}
package com.srini.tikacustom;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class TikaRecordWrite extends RecordWriter<Text, Text> {
private DataOutputStream out;
public TikaRecordWrite(DataOutputStream output) {
// TODO Auto-generated constructor stub
out=output;
try {
out.writeBytes("result:\r\n");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
out.close();
}
@Override
public void write(Text key, Text value) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
out.writeBytes(key.toString());
out.writeBytes(",");
out.writeBytes(value.toString());
out.writeBytes("\r\n");
}
}
@srinivasanHadoop
Copy link
Author

Hi Guys,
please send me the feed back

@cgizadi
Copy link

cgizadi commented Jan 13, 2014

This is very awesome!!
I don't understand, where is the reducer? I see only the mapper.

@b1tfury
Copy link

b1tfury commented Jun 12, 2014

Hi sri,
I used you your code to parse a pdf file.
But it is giving me an error :
Error: java.net.MalformedURLException: unknown protocol: hdfs
at java.net.URL.(URL.java:593)
at java.net.URL.(URL.java:483)
at java.net.URL.(URL.java:432)
at com.serendio.icvs.hbase.test.TikaRecordReader.nextKeyValue(TikaRecordReader.java:68)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:532)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

@arnab-s
Copy link

arnab-s commented Jun 15, 2015

Hey!! I am also getting the same error as mentioned above by sahil28. Did anyone get it to work
Please reply!!

@arnab-s
Copy link

arnab-s commented Jun 17, 2015

na batao !!!! maine chala liya hai!!! tumhe bhi nahi bataaunga par ab!!!

@chenchubabuch
Copy link

i am also facing same error Error: java.net.MalformedURLException: unknown protocol: hdfs . what the i got this error please anyone help me

@mungeol
Copy link

mungeol commented Aug 19, 2015

there is two ways to solve the error.

add 'URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());' at TikaRecordReader.java

...
...
...
FSDataInputStream fin = null;
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());

try {
String con = new Tika().parseToString(new URL(path.toString()));
...
...
...

or, change the code below which at TikaRecordReader.java

...
...
...
FSDataInputStream fin = null;

try {
String con = new Tika().parseToString(new URL(path.toString()));
...
...
...

to

...
...
...
FSDataInputStream fin = fs.open(path);

try {
String con = new Tika().parseToString(fin);
...
...
...

hope it helps

@sravanthi517
Copy link

hi i am getting error , what jar files i need to add for tika ????can you share if any ?

@ravikanth180
Copy link

Hi I am trying to use your mapreduce program to parse TIF images. Do i need to change any input output format ??

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment