Skip to content

Instantly share code, notes, and snippets.

@sebge2emasphere
Created July 30, 2018 11:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sebge2emasphere/002faefe4c161d349c03728aec386eac to your computer and use it in GitHub Desktop.
Save sebge2emasphere/002faefe4c161d349c03728aec386eac to your computer and use it in GitHub Desktop.
package com.emasphere.poc.hbase;
import com.emasphere.data.executor.common.DataFormatUtils;
import com.emasphere.data.executor.common.utils.FlowExecutorUtils;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.postgresql.Driver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.net.URL;
import java.sql.Connection;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Properties;
import static org.apache.hadoop.hbase.HConstants.*;
import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.*;
import static org.apache.hadoop.hbase.mapreduce.TableOutputFormat.*;
import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.*;
/**
* @author Sebastien Gerard
*/
public class HBaseSqlImporter {
public static HBaseSqlImporter sqlImporter(String tableName,
String query,
JavaSparkContext context) {
try {
return new HBaseSqlImporter(tableName, query, context);
} catch (Exception e) {
throw new IllegalArgumentException("Cannot initialize the importer.", e);
}
}
private static final Logger logger = LoggerFactory.getLogger(HBaseSqlImporter.class);
private final String tableName;
private final String query;
private final JavaSparkContext context;
private final Properties properties;
public HBaseSqlImporter(String tableName,
String query,
JavaSparkContext context) throws IOException {
this.tableName = tableName;
this.query = query;
this.context = context;
this.properties = new Properties();
this.properties.load(getClass().getResourceAsStream("/application.properties"));
}
public void load() throws MalformedURLException {
final String url = "jdbc:postgresql://localhost:" + properties.getProperty("importer.bastion.local-port")
+ "/" + properties.getProperty("importer.db.name")
+ "?user=" + properties.getProperty("importer.db.user")
+ "&password=" + properties.getProperty("importer.db.password");
final Session session = doSshTunnel(properties);
try {
logDbDetails(url);
final Configuration configuration = initializeConfiguration();
Dataset<Row> sqlDataSet = SQLContext
.getOrCreate(context.sc())
.read()
.option("driver", Driver.class.getName())
.jdbc(url, tableName, new Properties());
if (query != null) {
sqlDataSet = sqlDataSet.filter(query);
}
final Mapper func = new Mapper();
sqlDataSet
.toJavaRDD()
.mapToPair(func)
.saveAsNewAPIHadoopFile(
FlowExecutorUtils.TABLE,
ImmutableBytesWritable.class,
Result.class,
TableOutputFormat.class,
new JobConf(configuration)
);
} finally {
if (session != null) {
session.disconnect();
}
}
}
private Configuration initializeConfiguration() throws MalformedURLException {
final Configuration configuration = HBaseConfiguration.create();
final URL url = new URL("file:///etc/hbase/conf/hbase-site.xml");
if (new File(url.getFile()).exists()) {
logger.info("Loading HBase configuration, found {}.", new File(url.getFile()).exists());
configuration.addResource(url);
} else {
logger.info("HBase configuration file [{}] does not exist.", url);
}
configuration.set(INPUT_TABLE, "flow");
configuration.set(OUTDIR, "flow");
configuration.set(OUTPUT_TABLE, "flow");
configuration.set(ZOOKEEPER_QUORUM, "localhost");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
return configuration;
}
private void logDbDetails(String url) {
logger.info("Configured DB URL [{}].", url);
try {
final Connection connect = new Driver().connect(url, new Properties());
logger.info("Connection to Posgresql established, schema [{}].", connect.getSchema());
} catch (SQLException e) {
logger.error("Cannot connect to postgresql [" + e.getMessage() + "].", e);
}
}
private Session doSshTunnel(Properties properties) {
try {
final JSch jsch = new JSch();
final Session session = jsch.getSession(
properties.getProperty("importer.bastion.user"),
properties.getProperty("importer.bastion.url"),
Integer.valueOf(properties.getProperty("importer.bastion.port"))
);
jsch.addIdentity(
"eb-preprod",
IOUtils.toByteArray(
HBaseSqlImporter.class.getResource(properties.getProperty("importer.bastion.key-file"))
),
null,
null
);
final Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
session.connect();
session.setPortForwardingL(
Integer.valueOf(properties.getProperty("importer.bastion.local-port")),
properties.getProperty("importer.db.url"),
Integer.valueOf(properties.getProperty("importer.db.port"))
);
return session;
} catch (Exception e) {
logger.error("Cannot open SSH tunnel.", e);
return null;
}
}
private static class Mapper implements PairFunction<Row, ImmutableBytesWritable, Mutation> {
@Override
public Tuple2<ImmutableBytesWritable, Mutation> call(Row row) {
final byte[] rowKey = concat(
DataFormatUtils.toBytes(row.getDecimal(row.fieldIndex("id"))),
DataFormatUtils.toBytes(row.getString(row.fieldIndex("i")))
);
final Put put = new Put(rowKey);
for (int i = 0; i < row.size(); i++) {
final Object value = row.get(i);
if (value == null) {
continue;
}
final String name = row.schema().fieldNames()[i];
final byte[] bytesValue;
if (value instanceof BigDecimal) {
bytesValue = DataFormatUtils.toBytes((BigDecimal) value);
} else if (value instanceof Date) {
bytesValue = DataFormatUtils.toBytes(((Date) value).toLocalDate());
} else if (value instanceof Timestamp) {
bytesValue = DataFormatUtils.toBytes(((Timestamp) value).toLocalDateTime());
} else if (value instanceof Boolean) {
bytesValue = DataFormatUtils.toBytes((Boolean) value);
} else if (value instanceof String) {
bytesValue = DataFormatUtils.toBytes((String) value);
} else {
throw new UnsupportedOperationException("Unsupported type [" + value.getClass() + "].");
}
put.addColumn(
"d".getBytes(),
name.getBytes(),
0,
bytesValue
);
}
return Tuple2.apply(new ImmutableBytesWritable(rowKey), put);
}
public byte[] concat(byte[] first, byte[] second) {
byte[] both = Arrays.copyOf(first, first.length + second.length);
System.arraycopy(second, 0, both, first.length, second.length);
return both;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment