-
-
Save srinivasanHadoop/7245106 to your computer and use it in GitHub Desktop.
package com.srini.tikacustom; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
public class TikaFileInputFormat extends FileInputFormat<Text, Text> { | |
@Override | |
public RecordReader<Text, Text> createRecordReader(InputSplit split, | |
TaskAttemptContext context) throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return new TikaRecordReader(); | |
} | |
} |
package com.srini.tikacustom; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class TikaMapreduce extends Configured implements Tool { | |
public static class TikaMapper extends Mapper<Text, Text, Text, Text> { | |
public void map(Text key, Text value, Context context) | |
throws IOException, InterruptedException { | |
context.write(key, value); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
int exit = ToolRunner.run(new Configuration(), new TikaMapreduce(), | |
args); | |
System.exit(exit); | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
// TODO Auto-generated method stub | |
if (args.length != 2) { | |
System.out.println("set the input path and output path"); | |
return 2; | |
} | |
Configuration conf = new Configuration(); | |
Job job = new Job(conf, "TikaMapreduce"); | |
job.setJarByClass(getClass()); | |
job.setJobName("TikRead"); | |
job.setInputFormatClass(TikaFileInputFormat.class); | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
job.setMapperClass(TikaMapper.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(Text.class); | |
job.setOutputFormatClass(TikaOutPutFormt.class); | |
FileOutputFormat.setOutputPath(job, new Path(args[1] | |
+ System.currentTimeMillis())); | |
return job.waitForCompletion(true) ? 0 : 1; | |
} | |
} |
package com.srini.tikacustom; | |
import java.io.IOException; | |
import org.apache.hadoop.fs.FSDataOutputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
public class TikaOutPutFormt extends FileOutputFormat<Text, Text> { | |
@Override | |
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) | |
throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
Path path=FileOutputFormat.getOutputPath(context); | |
Path fullapth=new Path(path,"Srini.txt"); | |
FileSystem fs=path.getFileSystem(context.getConfiguration()); | |
FSDataOutputStream output=fs.create(fullapth,context); | |
return new TikaRecordWrite(output); | |
} | |
} |
package com.srini.tikacustom; | |
import java.io.IOException; | |
import java.net.URL; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
import org.apache.tika.Tika; | |
import org.apache.tika.exception.TikaException; | |
public class TikaRecordReader extends RecordReader<Text, Text> { | |
private Text key = new Text(); | |
private Text value = new Text(); | |
private FileSplit fileSplit; | |
private Configuration conf; | |
private boolean processed = false; | |
@Override | |
public void close() throws IOException { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public Text getCurrentKey() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return key; | |
} | |
@Override | |
public Text getCurrentValue() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return value; | |
} | |
@Override | |
public float getProgress() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
return processed ? 1.0f : 0.0f; | |
} | |
@Override | |
public void initialize(InputSplit split, TaskAttemptContext context) | |
throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
this.fileSplit = (FileSplit) split; | |
this.conf = context.getConfiguration(); | |
} | |
@Override | |
public boolean nextKeyValue() throws IOException, InterruptedException { | |
// TODO Auto-generated method stub | |
if (!processed) { | |
Path path = fileSplit.getPath(); | |
key.set(path.toString()); | |
@SuppressWarnings("unused") | |
FileSystem fs = path.getFileSystem(conf); | |
@SuppressWarnings("unused") | |
FSDataInputStream fin = null; | |
try { | |
String con = new Tika().parseToString(new URL(path.toString())); | |
String string = con.replaceAll("[$%&+,:;=?#|']", " "); | |
String string2 = string.replaceAll("\\s+", " "); | |
String lo = string2.toLowerCase(); | |
value.set(lo); | |
} catch (TikaException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
processed = true; | |
return true; | |
} else { | |
return false; | |
} | |
} | |
} |
package com.srini.tikacustom; | |
import java.io.DataOutputStream; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
public class TikaRecordWrite extends RecordWriter<Text, Text> { | |
private DataOutputStream out; | |
public TikaRecordWrite(DataOutputStream output) { | |
// TODO Auto-generated constructor stub | |
out=output; | |
try { | |
out.writeBytes("result:\r\n"); | |
} catch (IOException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
@Override | |
public void close(TaskAttemptContext context) throws IOException, | |
InterruptedException { | |
// TODO Auto-generated method stub | |
out.close(); | |
} | |
@Override | |
public void write(Text key, Text value) throws IOException, | |
InterruptedException { | |
// TODO Auto-generated method stub | |
out.writeBytes(key.toString()); | |
out.writeBytes(","); | |
out.writeBytes(value.toString()); | |
out.writeBytes("\r\n"); | |
} | |
} |
na batao !!!! maine chala liya hai!!! tumhe bhi nahi bataaunga par ab!!!
i am also facing same error Error: java.net.MalformedURLException: unknown protocol: hdfs . what the i got this error please anyone help me
there is two ways to solve the error.
add 'URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());' at TikaRecordReader.java
...
...
...
FSDataInputStream fin = null;
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
try {
String con = new Tika().parseToString(new URL(path.toString()));
...
...
...
or, change the code below which at TikaRecordReader.java
...
...
...
FSDataInputStream fin = null;
try {
String con = new Tika().parseToString(new URL(path.toString()));
...
...
...
to
...
...
...
FSDataInputStream fin = fs.open(path);
try {
String con = new Tika().parseToString(fin);
...
...
...
hope it helps
hi i am getting error , what jar files i need to add for tika ????can you share if any ?
Hi I am trying to use your mapreduce program to parse TIF images. Do i need to change any input output format ??
Hey!! I am also getting the same error as mentioned above by sahil28. Did anyone get it to work
Please reply!!