Skip to content

Instantly share code, notes, and snippets.

@sebge2emasphere
Created July 30, 2018 13:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sebge2emasphere/5b4cbeb089b845ea5f348abba2bd6e30 to your computer and use it in GitHub Desktop.
Save sebge2emasphere/5b4cbeb089b845ea5f348abba2bd6e30 to your computer and use it in GitHub Desktop.
package com.emasphere.poc.hbase.sample;
import com.emasphere.data.executor.common.DataFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import java.math.BigDecimal;
import java.net.URL;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import static org.apache.hadoop.hbase.HConstants.*;
import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.*;
import static org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.*;
import static org.apache.hadoop.hbase.mapreduce.TableOutputFormat.*;
import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.*;
/**
* @author Sebastien Gerard
*/
public class Sample01 {
public static BigDecimal compute(JavaSparkContext context) throws Exception {
// "code_vendeur = 'JVA' and datefact >= date '2018-01-01' and datefact <= date '2018-06-30'"
final Mapper mapper = new Mapper();
final Additionner additionner = new Additionner();
return context
.newAPIHadoopRDD(
initializeConfiguration(),
TableInputFormat.class,
ImmutableBytesWritable.class,
Result.class
)
.map(mapper)
.reduce(additionner);
}
private static Configuration initializeConfiguration() throws Exception {
final Configuration configuration = HBaseConfiguration.create();
configuration.addResource(new URL("file:///etc/hbase/conf/hbase-site.xml"));
configuration.set(INPUT_TABLE, "flow");
configuration.set(OUTDIR, "flow");
configuration.set(OUTPUT_TABLE, "flow");
configuration.set(ZOOKEEPER_QUORUM, "localhost");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
final Scan scan = new Scan();
scan.setFilter(
new FilterList(
new SingleColumnValueFilter(
"d".getBytes(),
"code_vendeur".getBytes(),
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(DataFormatUtils.toBytes("JVA"))
),
new SingleColumnValueFilter(
"d".getBytes(),
"datefact".getBytes(),
CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(
DataFormatUtils.toBytes(
LocalDate.of(2018, 1, 1).atTime(0, 0, 0)
)
)
),
new SingleColumnValueFilter(
"d".getBytes(),
"datefact".getBytes(),
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(
DataFormatUtils.toBytes(
LocalDate.of(2018, 6, 30).atTime(23, 59, 59)
)
)
)
)
);
scan.addColumn("d".getBytes(), "montant".getBytes());
scan.addColumn("d".getBytes(), "code_vendeur".getBytes());
scan.addColumn("d".getBytes(), "datefact".getBytes());
configuration.set(TableInputFormat.SCAN, convertScanToString(scan));
return configuration;
}
private static class Mapper implements Function<Tuple2<ImmutableBytesWritable, Result>, BigDecimal> {
@Override
public BigDecimal call(Tuple2<ImmutableBytesWritable, Result> row) {
return DataFormatUtils.toBigDecimal(row._2().getValue("d".getBytes(), "montant".getBytes()));
}
}
private static class Additionner implements Function2<BigDecimal, BigDecimal, BigDecimal> {
@Override
public BigDecimal call(BigDecimal first, BigDecimal second) {
if (first == null) {
return null;
} else if (second == null) {
return null;
} else {
return first.add(second);
}
}
}
private static class Filter implements Function<Tuple2<ImmutableBytesWritable, Result>, Boolean> {
@Override
public Boolean call(Tuple2<ImmutableBytesWritable, Result> result) {
final String vendeur = DataFormatUtils.toString(result._2().getValue("d".getBytes(), "code_vendeur".getBytes()));
final LocalDateTime dateFact = DataFormatUtils.toLocalDateTime(result._2().getValue("d".getBytes(), "datefact".getBytes()));
return (vendeur != null)
&& vendeur.contains("JVA")
&& (dateFact != null)
&& dateFact.isAfter(LocalDate.of(2018, 1, 1).atStartOfDay(ZoneId.systemDefault()).toLocalDateTime())
&& dateFact.isBefore(LocalDate.of(2018, 6, 30).atStartOfDay(ZoneId.systemDefault()).toLocalDateTime());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment