Skip to content

Instantly share code, notes, and snippets.

@butlermh
Created June 2, 2011 21:10
Show Gist options
  • Save butlermh/1005347 to your computer and use it in GitHub Desktop.
Save butlermh/1005347 to your computer and use it in GitHub Desktop.
Threaded Implementation of Corpus Generator
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.digitalpebble.behemoth.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import com.digitalpebble.behemoth.BehemothDocument;
import com.digitalpebble.behemoth.CliProcessor;
/**
* Generates a SequenceFile containing BehemothDocuments given a local
* directory. The BehemothDocument gets its byte content and URL. The detection
* of MIME-type and text extraction can be done later using the TikaProcessor.
**/
public class CorpusGeneratorQueue {
public final static String USAGE = "Generate a Behemoth corpus on HDFS from a local directory";
public static void main(String argv[]) throws Exception {
// Populate a SequenceFile with the content of a local directory
CliProcessor cliProcessor = new CliProcessor(
CorpusGenerator.class.getSimpleName(), USAGE);
String inputOpt = cliProcessor.addRequiredOption("i", "input",
"Input directory on local file system", true);
String outputOpt = cliProcessor.addRequiredOption("o", "output",
"Output directory on HDFS", true);
String recurseOpt = cliProcessor.addOption("s", "recurse",
"Recurse through input directories", false);
try {
cliProcessor.parse(argv);
} catch (ParseException me) {
return;
}
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
File inputDir = new File(cliProcessor.getOptionValue(inputOpt));
boolean recurse = cliProcessor.hasOption(recurseOpt);
try {
new CorpusGeneratorQueue(conf, fs,
cliProcessor.getOptionValue(outputOpt), inputDir, recurse);
} finally {
}
}
private BlockingQueue<File> filesToLoad = new ArrayBlockingQueue<File>(1000);
private File root;
private boolean recurse;
private static final File FINISHED = new File("*FINISHED_PROCESSING");
CorpusGeneratorQueue(Configuration conf, FileSystem fs, String output,
File file, boolean _recurse) throws IOException {
recurse = _recurse;
root = file;
List<Thread> threads = new ArrayList<Thread>();
threads.add(new SearchDirectories());
for (int i = 0; i < 20; i++) {
BehemothDocument value = new BehemothDocument();
Text key = new Text();
String number = "" + i;
String outputname;
if (number.length() == 1 ) {
outputname = output + "/part-0000" + i;
} else {
outputname = output + "/part-000" + i;
}
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
new Path(outputname), key.getClass(), value.getClass());
threads.add(new RetrieveFile(writer, key, value));
}
for (Thread thread : threads) {
thread.start();
}
}
public class SearchDirectories extends Thread {
public void run() {
try {
process(root, true);
filesToLoad.put(FINISHED); // we have finished processing
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unexpected interruption");
}
}
private void process(File file, boolean r) throws InterruptedException {
if (file.isDirectory() && r) {
for (File f : file.listFiles()) {
process(f, recurse);
}
} else {
String name = file.getName();
if (!name.startsWith(".")) {
filesToLoad.put(file);
}
}
}
}
public class RetrieveFile extends Thread {
private SequenceFile.Writer writer;
private Text key;
private BehemothDocument value;
public RetrieveFile(SequenceFile.Writer writer, Text key,
BehemothDocument value) {
this.writer = writer;
this.key = key;
this.value = value;
}
public void run() {
try {
File file;
while ((file = filesToLoad.take()) != FINISHED) {
if (file != FINISHED) {
String URI = file.toURI().toString();
byte[] fileBArray = new byte[(int) file.length()];
try {
ByteBuffer fileArray = ByteBuffer.wrap(fileBArray);
FileInputStream fis = new FileInputStream(file);
FileChannel fc = fis.getChannel();
fc.read(fileArray);
fis.close();
value.setUrl(URI);
value.setContent(fileBArray);
key.set(value.getUrl());
try {
writer.append(key, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
// put the null file back on the queue so the
// other threads terminate
filesToLoad.put(FINISHED);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unexpected interruption");
} finally {
IOUtils.closeStream(writer);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment