public
Last active

Sample bulk delete

  • Download Gist
gistfile1.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
package example.hbase.tools;
 
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
 
 
public class BulkDeleteTest {
 
public static class BulkDeleteTestMapper extends TableMapper<ImmutableBytesWritable, KeyValue> {
 
// Iterable<String> columnFamilies;
private List<byte[]> columnFamilies;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
columnFamilies = new ArrayList<byte[]>();
for (String cf : Splitter.on(",").split(conf.get("delete.columnFamilies"))) {
columnFamilies.add(Bytes.toBytes(cf));
}
}
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException, InterruptedException {
NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> valuesMap = values.getMap();
 
// if the record has data for a given column family, write out a delete marker for that family
for (byte[] cf : columnFamilies) {
if (valuesMap.containsKey(cf)) {
context.write(row, new KeyValue(row.get(), cf, null, HConstants.LATEST_TIMESTAMP, KeyValue.Type.DeleteFamily));
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("mapred.map.tasks.speculative.execution", "false");
conf.set("mapred.reduce.tasks.speculative.execution", "false");
 
GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args);
HTable users = new HTable("users");
// get a list of all the column families for this table and pass it to the mapper
// as a comma-delimited list. Since some records may not have data for all CFs
// it is important to get this list separately
List<String> columnFamilies = new ArrayList<String>();
for (byte[] cf : users.getTableDescriptor().getFamiliesKeys()) {
columnFamilies.add(Bytes.toString(cf));
}
conf.set("delete.columnFamilies", Joiner.on(",").join(columnFamilies));
Job job = new Job(conf, "BulkDeleteTest");
job.setJarByClass(BulkDeleteTestMapper.class);
// make sure to include all columns families in the scan so we can make
// sure to delete them all if the row is a candidate for deletion
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(1000);
TableMapReduceUtil.initTableMapperJob("users", scan, BulkDeleteTestMapper.class, ImmutableBytesWritable.class, KeyValue.class, job);
HFileOutputFormat.configureIncrementalLoad(job, users);
FileOutputFormat.setOutputPath(job, new Path("/tmp/bulkdelete"));
job.waitForCompletion(true);
 
}
 
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.