Skip to content

Instantly share code, notes, and snippets.

@1cadumagalhaes
Created May 26, 2022 19:25
Show Gist options
  • Save 1cadumagalhaes/47dcf90328ebc63099ad9a71002c458c to your computer and use it in GitHub Desktop.
Save 1cadumagalhaes/47dcf90328ebc63099ad9a71002c458c to your computer and use it in GitHub Desktop.
Dataflow example
# Copyright 2022 DP6
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import apache_beam as beam
import logging
import requests
import json
from datetime import datetime
class MetaAdsConversionUploader(beam.DoFn):
def __init__(self, access_token, pixel_id, auth_request=None, job_id=None, job_name=None):
self.access_token = access_token
self.pixel_id = pixel_id
self.active = self.access_token is not None
self.API_VERSION = "v13.0"
self.auth_request = auth_request
self.job_id = job_id
self.job_name = job_name
self.monitoring = self.auth_request is not None and self.job_id is not None
self.response = None
self.size = 0
def start_bundle(self):
logging.info("Starting meta capi upload")
if(self.monitoring):
self.auth_request.put(f'/executions?id={self.job_id}', {
"status": "RUNNING",
"description": "Trying to upload batch to CAPI",
})
def process(self, elements, **kwargs):
if not self.active:
logging.warning('Skipping upload, parameters not configured.')
return
else:
self._do_upload(elements)
def finish_bundle(self):
if(self.monitoring):
self.auth_request.put(f'/executions?id={self.job_id}', {
"status": "SUCCESS",
"description": f"Uploaded {self.size} events",
})
return super().finish_bundle()
def _do_upload(self, elements):
url = f"https://graph.facebook.com/{self.API_VERSION}/{self.pixel_id}/events?access_token={self.access_token}"
self.size = len(elements)
logging.info(
f"Uploading {self.size} rows. Pixel ID: {self.pixel_id}")
data = list(map(self._format_output, elements))
body = {
"data": json.dumps(data),
"test_event_code": "TEST36629",
}
try:
response = requests.post(url, data=body)
logging.info(response)
logging.info(response.text)
if(not response.ok):
self.request = "Error: response returned {response.status_code}: {response.text}"
raise Exception(
f"Response returned {response.status_code}: {response.text}")
else:
self.request = "Success: response returned {response.status_code}: {response.text}"
except Exception as error:
if(self.monitoring):
self.auth_request.put(f'/executions?id={self.job_id}', {
"status": "ERROR",
"description": f"Request returned an error.",
})
logging.error("There was an error:", error)
def _format_output(self, row):
def set_key(input, output, key, default=None):
"""Sets a key into a output dict if it exists in the input dict.
If it doesn't and a default value is passed, sets the key to that.
Parameters:
input (dict):
output (dict):
key (string):
default (any):
"""
if input.get(key):
output[key] = str(input[key])
elif default is not None:
output[key] = str(default)
try:
data = {
"event_name": row.get("event_name"),
"event_time": int(datetime.timestamp(row.get("event_timestamp"))),
}
user_data = {
"em": row.get("email"),
"ph": row.get("phone"),
"fn": row.get("first_name"),
"ln": row.get("last_name"),
"ge": row.get("gender"),
"db": row.get("date_of_birth"),
"client_ip_address": row.get("user_ip_address"),
"client_user_agent": row.get("user_agent"),
"ct": row.get("city"),
"st": row.get("state"),
"zp": row.get("zip_code"),
"country": row.get("country")
}
set_key(row, user_data, "fbc")
set_key(row, user_data, "fbp")
data["user_data"] = user_data
custom_data = {}
set_key(row, custom_data, "content_category")
if(row.get("content_ids")):
custom_data["content_ids"] = json.dumps(row.get("content_ids"))
# set_key(row,custom_data,"content_ids")
set_key(row, custom_data, "content_name")
set_key(row, custom_data, "content_type")
# set_key(row,custom_data,"contents")
if(row.get("contents")):
list = []
for content in row.get("contents"):
if(content.get("quantity")):
content["quantity"] = int(content.get("quantity"))
list.append(content)
custom_data["contents"] = json.dumps(list)
set_key(row, custom_data, "currency")
set_key(row, custom_data, "delivery_category")
set_key(row, custom_data, "num_items")
set_key(row, custom_data, "order_id")
set_key(row, custom_data, "predicted_ltv")
set_key(row, custom_data, "search_string")
set_key(row, custom_data, "status")
set_key(row, custom_data, "value")
# if(len(custom_data!=0)):
data["custom_data"] = custom_data
# set_key(row,data,"event_id")
data["event_id"] = str(int(row.get("event_id")))
set_key(row, data, "event_source_url")
set_key(row, data, "opt_out", False)
set_key(row, data, "action_source", "website")
return data
except Exception as e:
print(f'Error trying to parse the table schema: {e}')
if(self.monitoring):
self.auth_request.put(f'/executions?id={self.job_id}', {
"status": "ERROR",
"description": f"Error trying to parse one row of the table",
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment