Skip to content

Instantly share code, notes, and snippets.

@sebge2emasphere
Created July 30, 2018 11:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sebge2emasphere/6c4df97ae86e7cdcad064137b3b26dcb to your computer and use it in GitHub Desktop.
Save sebge2emasphere/6c4df97ae86e7cdcad064137b3b26dcb to your computer and use it in GitHub Desktop.
package com.emasphere.poc.parquetspark;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session;
import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.postgresql.Driver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
/**
* @author Sebastien Gerard
*/
public class ParquetSqlImporter {
public static ParquetSqlImporter sqlImporter(String tableName,
String query,
JavaSparkContext context) {
try {
return new ParquetSqlImporter(tableName, query, context);
} catch (Exception e) {
throw new IllegalArgumentException("Cannot initialize the importer.", e);
}
}
public static final String LOCATION = "s3a://ema-data-lake/parquet-test/test.parquet";
private static final Logger logger = LoggerFactory.getLogger(ParquetSqlImporter.class);
private final String tableName;
private final String query;
private final JavaSparkContext context;
private final Properties properties;
public ParquetSqlImporter(String tableName,
String query,
JavaSparkContext context) throws IOException {
this.tableName = tableName;
this.query = query;
this.context = context;
this.properties = new Properties();
this.properties.load(getClass().getResourceAsStream("/application.properties"));
}
public void load() {
final String url = "jdbc:postgresql://localhost:" + properties.getProperty("importer.bastion.local-port")
+ "/" + properties.getProperty("importer.db.name")
+ "?user=" + properties.getProperty("importer.db.user")
+ "&password=" + properties.getProperty("importer.db.password");
final Session session = doSshTunnel(properties);
try {
logDbDetails(url);
SQLContext
.getOrCreate(context.sc())
.read()
.option("driver", Driver.class.getName())
.jdbc(url, tableName, new Properties())
.filter(query)
.write()
.mode(SaveMode.Append)
.parquet(LOCATION);
} finally {
if (session != null) {
session.disconnect();
}
}
}
private void logDbDetails(String url) {
logger.info("Configured DB URL [{}].", url);
try {
final Connection connect = new Driver().connect(url, new Properties());
logger.info("Connection to Posgresql established, schema [{}].", connect.getSchema());
} catch (SQLException e) {
logger.error("Cannot connect to postgresql [" + e.getMessage() + "].", e);
}
}
private Session doSshTunnel(Properties properties) {
try {
final JSch jsch = new JSch();
final Session session = jsch.getSession(
properties.getProperty("importer.bastion.user"),
properties.getProperty("importer.bastion.url"),
Integer.valueOf(properties.getProperty("importer.bastion.port"))
);
jsch.addIdentity(
"eb-preprod",
IOUtils.toByteArray(
ParquetSqlImporter.class.getResource(properties.getProperty("importer.bastion.key-file"))
),
null,
null
);
final Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
session.connect();
session.setPortForwardingL(
Integer.valueOf(properties.getProperty("importer.bastion.local-port")),
properties.getProperty("importer.db.url"),
Integer.valueOf(properties.getProperty("importer.db.port"))
);
return session;
} catch (Exception e) {
logger.error("Cannot open SSH tunnel.", e);
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment