Skip to content

Instantly share code, notes, and snippets.

View akj009's full-sized avatar
🏠
Working from home

Abhishek J akj009

🏠
Working from home
View GitHub Profile
@akj009
akj009 / submit-command.sh
Last active January 12, 2020 11:07
command to submit beam job with spark runner on yarn
spark2-submit --class com.mptyminds.dataflow.Main \
--master yarn --deploy-mode client \
--driver-memory 2g --executor-memory 1g --executor-cores 1 \
--conf spark.yarn.appMasterEnv.GOOGLE_APPLICATION_CREDENTIALS=<credenial_file_path> \
--conf spark.yarn.executorEnv.GOOGLE_APPLICATION_CREDENTIALS=<credenial_file_path> \
--conf spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS=<credenial_file_path> \
/path/to/final-shaded.jar \
--hdfsConfiguration=[{\"fs.default.name\":\"hdfs:/host:port\"}] \
--sparkMaster=yarn --streaming=false \
--project=<gcp-project> \
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mptyminds.dataflow</groupId>
<artifactId>bigquery-to-hdfs</artifactId>
<version>1.0</version>
@akj009
akj009 / ParquetFileOutput.java
Last active January 11, 2020 10:40
writing avro pcollection to hdfs in parquet format with custom naming strategy
genericRecordPCollection.apply(
FileIO
.<GenericRecord>write()
.via(ParquetIO.sink(schema))
.to(pipelineOptions.getOutputFilePath())
.withNumShards(pipelineOptions.getOutputFileShardCount())
.withNaming((window, pane, numShards, shardIndex, compression) -> String.format(
"%s_%d_%d.%s", outputFilePrefix, startTime, endTime, "parq"))
);
@akj009
akj009 / AvroPCollectionFromBQ.java
Created January 11, 2020 10:14
creating generic record pcollection using bigquery io
final PCollection<GenericRecord> genericRecordPCollection = pipeline.apply(
BigQueryIO
.read(SchemaAndRecord::getRecord)
.fromQuery(query)
.withCoder(AvroCoder.of(schema)).usingStandardSql()
);