Skip to content

Instantly share code, notes, and snippets.

View sandboxws's full-sized avatar
🏠
Debug → Analyze → Refactor → Performance Gains → Repeat

Ahmed Elhossaini sandboxws

🏠
Debug → Analyze → Refactor → Performance Gains → Repeat
View GitHub Profile
@sandboxws
sandboxws / get_addons_host.gql
Created March 17, 2020 17:23
GetAddOns as a host
{
"operationName": "GetAddOns",
"variables": {
"hostId": "5dd43565611ffd0029bc80b2",
"listingId": "5e6b8d8ed39f1e00505f0294",
"currency": "CAD",
"offset": 0,
"limit": 15,
"state": "active"
},
@sandboxws
sandboxws / mongoio_sink.py
Created March 9, 2018 01:45
WIP MongoDB Apache Beam Sink for Python
__all__ = ['WriteToMongo']
import json
from pymongo import MongoClient
from apache_beam.transforms import PTransform
from apache_beam.io import iobase
class _MongoSink(iobase.Sink):
"""A :class:`~apache_beam.io.iobase.Sink`."""
// Extract errors PCollectionTuple
PCollection<String> bqTableRowsErrors =
bqTableRowsTuple.get(bqTableRowsFailedTag)
.setCoder(NullableCoder.of(StringUtf8Coder.of()));
// Log errors to a text file under cloud storage.
bqTableRowsErrors
.apply(
"Write Errors",
TextIO.write().to("gs://beam-tutorial/album_errors.txt")
PCollectionTuple bqTableRowsTuple =
rows.apply(
"HashMap to TableRow",
ParDo.of(new HashMapToTableRowFn(bqTableRowsSuccessTag, bqTableRowsFailedTag))
.withOutputTags(bqTableRowsSuccessTag, TupleTagList.of(bqTableRowsFailedTag))
);
final TupleTag<TableRow> bqTableRowsSuccessTag =
new TupleTag<TableRow>() {};
final TupleTag<String> bqTableRowsFailedTag =
new TupleTag<String>() {};
package com.sandboxws.chinook;
import com.google.api.services.bigquery.model.TableRow;
import com.sandboxws.beam.AppOptions;
import com.sandboxws.beam.coders.TableRowCoder;
import com.sandboxws.chinook.bigquery.schema.AlbumTableSchema;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
gradle albums --args="--pgDatabase=chinook_development --pgUsername=root --project=GOOGLE_CLOUD_PROJECT_ID --outputTable=dwh.albums --tempLocation=gs://beam_tutorial/temp --stagingLocation=gs://beam_tutorial/staging"
PCollection<HashMap<String, Object>> rows = pipeline.apply(
"Read Albums from PG",
JdbcIO.<HashMap<String, Object>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(pgDataSource))
.withCoder(TableRowCoder.of())
// Map ResultSet row to a HashMap
.withRowMapper(new RowMapper<HashMap<String, Object>>() {
@Override
public HashMap<String, Object> mapRow(ResultSet resultSet) throws Exception {
return TableRowMapper.asMap(resultSet, tableName, pkName);
@sandboxws
sandboxws / AlbumPipeline.java
Last active December 29, 2018 06:07
Album Batch Pipeline
package com.sandboxws.chinook;
import com.google.api.services.bigquery.model.TableRow;
import com.sandboxws.beam.AppOptions;
import com.sandboxws.beam.coders.TableRowCoder;
import com.sandboxws.chinook.bigquery.schema.AlbumTableSchema;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
bqTableRows.apply("Write to BigQuery",
BigQueryIO.writeTableRows()
.to(options.getOutputTable()) // Passed as an argument from the command line
.withSchema(AlbumTableSchema.schema()) // The schema for the BigQuery table
.ignoreUnknownValues() // Ignore any values passed but not defined on the table schema
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) // Append to the BigQuery table.
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) // Create the BigQuery table if it doesn't exist
);