Created
October 5, 2012 18:14
-
-
Save pmackles/3841437 to your computer and use it in GitHub Desktop.
Sample bulk delete
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.hbase.tools; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.NavigableMap; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.HConstants; | |
import org.apache.hadoop.hbase.KeyValue; | |
import org.apache.hadoop.hbase.client.HTable; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; | |
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; | |
import org.apache.hadoop.hbase.mapreduce.TableMapper; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper.Context; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
import com.google.common.base.Joiner; | |
import com.google.common.base.Splitter; | |
public class BulkDeleteTest { | |
public static class BulkDeleteTestMapper extends TableMapper<ImmutableBytesWritable, KeyValue> { | |
// Iterable<String> columnFamilies; | |
private List<byte[]> columnFamilies; | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
super.setup(context); | |
Configuration conf = context.getConfiguration(); | |
columnFamilies = new ArrayList<byte[]>(); | |
for (String cf : Splitter.on(",").split(conf.get("delete.columnFamilies"))) { | |
columnFamilies.add(Bytes.toBytes(cf)); | |
} | |
} | |
@Override | |
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException, InterruptedException { | |
NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> valuesMap = values.getMap(); | |
// if the record has data for a given column family, write out a delete marker for that family | |
for (byte[] cf : columnFamilies) { | |
if (valuesMap.containsKey(cf)) { | |
context.write(row, new KeyValue(row.get(), cf, null, HConstants.LATEST_TIMESTAMP, KeyValue.Type.DeleteFamily)); | |
} | |
} | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
Configuration conf = HBaseConfiguration.create(); | |
conf.set("mapred.map.tasks.speculative.execution", "false"); | |
conf.set("mapred.reduce.tasks.speculative.execution", "false"); | |
GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args); | |
HTable users = new HTable("users"); | |
// get a list of all the column families for this table and pass it to the mapper | |
// as a comma-delimited list. Since some records may not have data for all CFs | |
// it is important to get this list separately | |
List<String> columnFamilies = new ArrayList<String>(); | |
for (byte[] cf : users.getTableDescriptor().getFamiliesKeys()) { | |
columnFamilies.add(Bytes.toString(cf)); | |
} | |
conf.set("delete.columnFamilies", Joiner.on(",").join(columnFamilies)); | |
Job job = new Job(conf, "BulkDeleteTest"); | |
job.setJarByClass(BulkDeleteTestMapper.class); | |
// make sure to include all columns families in the scan so we can make | |
// sure to delete them all if the row is a candidate for deletion | |
Scan scan = new Scan(); | |
scan.setCacheBlocks(false); | |
scan.setCaching(1000); | |
TableMapReduceUtil.initTableMapperJob("users", scan, BulkDeleteTestMapper.class, ImmutableBytesWritable.class, KeyValue.class, job); | |
HFileOutputFormat.configureIncrementalLoad(job, users); | |
FileOutputFormat.setOutputPath(job, new Path("/tmp/bulkdelete")); | |
job.waitForCompletion(true); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment