Skip to content

Instantly share code, notes, and snippets.

View sandboxws's full-sized avatar
🏠
Debug → Analyze → Refactor → Performance Gains → Repeat

Ahmed Elhossaini sandboxws

🏠
Debug → Analyze → Refactor → Performance Gains → Repeat
View GitHub Profile
Pipeline pipeline = Pipeline.create(options);
PGSimpleDataSource pgDataSource = getPostgresDataSource(options);
PCollection<HashMap<String, Object>> rows = pipeline.apply(
"Read Albums from PG",
JdbcIO.<HashMap<String, Object>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(pgDataSource))
.withCoder(TableRowCoder.of())
// Map ResultSet row to a HashMap
.withRowMapper(new RowMapper<HashMap<String, Object>>() {
@Override
public HashMap<String, Object> mapRow(ResultSet resultSet) throws Exception {
return TableRowMapper.asMap(resultSet, tableName, pkName);
@sandboxws
sandboxws / build.gradle
Created December 29, 2018 05:39
Apache Beam, Dataflow, PostgreSQL, and BigQuery
dependencies {
// Apache Beam
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version:'2.9.0' // Core Java SDK
compile group: 'org.apache.beam', name: 'beam-sdks-java-io-google-cloud-platform', version:'2.9.0' // Google Cloud IO
compile group: 'org.apache.beam', name: 'beam-sdks-java-io-jdbc', version:'2.9.0' // JDBC IO
runtime group: 'org.apache.beam', name: 'beam-runners-direct-java', version:'2.9.0' // Beam Direct Runner
// Dataflow
compile group: 'com.google.cloud.dataflow', name: 'google-cloud-dataflow-java-sdk-all', version: '2.5.0' // Dataflow Java SDK
// Logging
runtime group: 'org.slf4j', name: 'slf4j-jdk14', version:'1.7.25'
@sandboxws
sandboxws / TableRowCoder.java
Created December 29, 2018 05:34
TableRow Apache Beam Coder
package com.sandboxws.beam.coders;
import com.google.gson.Gson;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.values.TypeDescriptor;
@sandboxws
sandboxws / AlbumSchema.java
Created December 29, 2018 05:33
Album BigQuery table schema
package com.sandboxws.chinook.bigquery.schema;
import com.google.api.services.bigquery.model.TableSchema;
import com.sandboxws.bigquery.TableSchemaBuilder;
/**
* Album BigQuery table schema.
*
* @author Ahmed El.Hussaini
*/
@sandboxws
sandboxws / AlbumPipeline.java
Last active December 29, 2018 06:07
Album Batch Pipeline
package com.sandboxws.chinook;
import com.google.api.services.bigquery.model.TableRow;
import com.sandboxws.beam.AppOptions;
import com.sandboxws.beam.coders.TableRowCoder;
import com.sandboxws.chinook.bigquery.schema.AlbumTableSchema;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
@sandboxws
sandboxws / TableRowMapper.java
Created December 29, 2018 01:39
Maps a HashMap to a BiigQuery TableRow
package com.sandboxws.chinook;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.TimeZone;
@sandboxws
sandboxws / TableSchemaBuilder.java
Created December 29, 2018 01:37
BigQuery table schema builder
package com.sandboxws.bigquery;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;
/**
* BigQuery table schema builder.
*
@sandboxws
sandboxws / AppOptions.java
Created December 29, 2018 01:32
Custom Dataflow PipelineOptions
package com.sandboxws.beam;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
/**
* App specific Apache Beam pipeline options.
*
* @author Ahmed Elhossaini