Skip to content

Instantly share code, notes, and snippets.

@browny
Last active February 23, 2021 07:40
Show Gist options
  • Save browny/5493453fc84dd13c2aa4ed41e5c47b6e to your computer and use it in GitHub Desktop.
Save browny/5493453fc84dd13c2aa4ed41e5c47b6e to your computer and use it in GitHub Desktop.
streaiming-data-processing.md
  • Enable services
gcloud services enable pubsub.googleapis.com
gcloud services enable dataflow.googleapis.com
  • Create workstation machine
gcloud compute instances create host --zone=asia-east1-b --machine-type=n1-standard-1 \
  --metadata=startup-script-url=gs://cloud-training/initscripts/init-script-sdp-2.sh \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --image=projects/cloud-training-prod-bucket/global/images/ci-de-training-vm \
  --image-project=cloud-training-prod-bucket \
  --boot-disk-size=20GB --boot-disk-type=pd-standard
  • Login workstation
git clone https://github.com/GoogleCloudPlatform/training-data-analyst

# Sets the DEVSHELL_PROJECT_ID and BUCKET environment variables
source /training/project_env.sh

# Create dataset
bq --location=US mk -d demos

# Create GCS bucket
gsutil mb gs://$BUCKET

# Simulate traffic sensor data into Pub/Sub
cd ~/training-data-analyst/courses/streaming/publish
./download_data.sh
./send_sensor_data.py --speedFactor=60 --project $DEVSHELL_PROJECT_ID
  • Login workstation in another window
# Sets the DEVSHELL_PROJECT_ID and BUCKET environment variables
source /training/project_env.sh

# Launch Dataflow Pipeline
cd ~/training-data-analyst/courses/streaming/process/sandiego

# Check script that creates and runs the Dataflow pipeline
https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/streaming/process/sandiego/run_oncloud.sh

# Check source code while running the application
https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/streaming/process/sandiego/src/main/java/com/google/cloud/training/dataanalyst/sandiego/AverageSpeeds.java

# Run dataflow pipeline
./run_oncloud.sh $DEVSHELL_PROJECT_ID $BUCKET AverageSpeeds

# Check another source
https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/streaming/process/sandiego/src/main/java/com/google/cloud/training/dataanalyst/sandiego/CurrentConditions.java

# Run another dataflow pipeline
./run_oncloud.sh $DEVSHELL_PROJECT_ID $BUCKET CurrentConditions
  • Review BigQuery output
SELECT *
FROM `demos.average_speeds`
ORDER BY timestamp DESC
LIMIT 100;

SELECT
MAX(timestamp)
FROM
`demos.average_speeds`;

SELECT *
FROM `demos.average_speeds`
FOR SYSTEM_TIME AS OF TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 10 MINUTE)
ORDER BY timestamp DESC
LIMIT 100
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment