Skip to content

Instantly share code, notes, and snippets.

@loldenburg
Last active May 19, 2021 19:41
Show Gist options
  • Save loldenburg/4cba2475a2624499c8313bb9cc055f2a to your computer and use it in GitHub Desktop.
Save loldenburg/4cba2475a2624499c8313bb9cc055f2a to your computer and use it in GitHub Desktop.
import os
from asyncio import Future
import json
from typing import Dict
from flask import Request
from google.cloud import pubsub
from firestore.reference import FireRef
publish_client = pubsub.PublisherClient()
def pubsub_stitcher(request: Request):
"""Retrieves details from the `request`, stitches the id contained in the payload
with additional data attributes from Firestore.
The `payload` parameter of the request JSON body needs to contain an `id` parameter to work.
The `token` parameter of the request JSON needs to match the token specified at the start
of the script to avoid that this endpoint compromises User ID info.
The optional `attrs` parameter represents the PubSub message optional attributes object
in case we want asynchronous processing (in this case we forward the stitching request to a PubSub topic for async handling).
"""
print(f"Starting script for GTM Stitching Event Processing")
token = "XXXXXXXX" # the security token
allowed_gtm_containers = ["GTM-XXXXXX", "GTM-XXXXXX"] # server-side GTM containers that can send data
request_json: Dict = request.get_json()
print(f'Received JSON request: {request_json}')
token_in_req = f'{request_json.get("token")}'
if (token_in_req is 'None') | (token_in_req != token):
return "false or no token", 403
event_payload: str = request_json.get('eventPayload')
if event_payload is None:
return "payload missing", 404
gtm_container_in_req = f'{attrs.get("gtmContainerId")}'
if gtm_container_in_req not in allowed_gtm_containers:
return "gtmContainerId not provided in attrs or containerId not allowed to send data", 403
async_response = False
if event_payload.get("async_response") == "0":
print("Synchronous response requested. Querying Firestore now.")
# getting script ID from Firestore
_id = event_payload['id']
firestore_id_ref = FireRef.scriptRuns().document(_id)
stitched_id_dict = firestore_id_ref.get().to_dict()
# Return the stitched Attributes
return dumps({"id": _id, "data": f"{stitched_id_dict}"}), 200
else:
print(
"Asynchronous response requested. Returning 200 OK for now and triggering asynch pubsub topic for stitching ID")
project: str = os.environ.get("GCP_PROJECT")
topic_name = "xxxxxxx"
topic: str = f'projects/{project}/topics/{topic_name}'
async_payload: str = request_json['eventPayload']
async_payload["script"] = "pubsub_forwarder_gtm"
request_attrs: Dict[str, str] = request_json[
'attrs'] or {} # all values of pubsub message attrs need to be strings!
print(f'Going to publish payload `{async_payload}` for asynch execution into topic `{topic_name}`.')
future: Future = publish_client.publish(topic=topic, data=str.encode(json.dumps(async_payload)), **request_attrs)
published_message_id: str = future.result()
print(
f'Successfully published a message `{published_message_id}` to PubSub topic `{topic_name}` with attributes `{request_attrs}`.')
return json.dumps({"messageId": published_message_id, "async_payload": async_payload, "attrs": request_attrs}), 200
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment