Skip to content

Instantly share code, notes, and snippets.

@pmackles
Created October 5, 2012 18:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save pmackles/3841437 to your computer and use it in GitHub Desktop.
Save pmackles/3841437 to your computer and use it in GitHub Desktop.
Sample bulk delete
package example.hbase.tools;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
public class BulkDeleteTest {
public static class BulkDeleteTestMapper extends TableMapper<ImmutableBytesWritable, KeyValue> {
// Iterable<String> columnFamilies;
private List<byte[]> columnFamilies;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
columnFamilies = new ArrayList<byte[]>();
for (String cf : Splitter.on(",").split(conf.get("delete.columnFamilies"))) {
columnFamilies.add(Bytes.toBytes(cf));
}
}
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException, InterruptedException {
NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> valuesMap = values.getMap();
// if the record has data for a given column family, write out a delete marker for that family
for (byte[] cf : columnFamilies) {
if (valuesMap.containsKey(cf)) {
context.write(row, new KeyValue(row.get(), cf, null, HConstants.LATEST_TIMESTAMP, KeyValue.Type.DeleteFamily));
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("mapred.map.tasks.speculative.execution", "false");
conf.set("mapred.reduce.tasks.speculative.execution", "false");
GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args);
HTable users = new HTable("users");
// get a list of all the column families for this table and pass it to the mapper
// as a comma-delimited list. Since some records may not have data for all CFs
// it is important to get this list separately
List<String> columnFamilies = new ArrayList<String>();
for (byte[] cf : users.getTableDescriptor().getFamiliesKeys()) {
columnFamilies.add(Bytes.toString(cf));
}
conf.set("delete.columnFamilies", Joiner.on(",").join(columnFamilies));
Job job = new Job(conf, "BulkDeleteTest");
job.setJarByClass(BulkDeleteTestMapper.class);
// make sure to include all columns families in the scan so we can make
// sure to delete them all if the row is a candidate for deletion
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(1000);
TableMapReduceUtil.initTableMapperJob("users", scan, BulkDeleteTestMapper.class, ImmutableBytesWritable.class, KeyValue.class, job);
HFileOutputFormat.configureIncrementalLoad(job, users);
FileOutputFormat.setOutputPath(job, new Path("/tmp/bulkdelete"));
job.waitForCompletion(true);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment