Skip to content

Instantly share code, notes, and snippets.

@boseoladipo
Created July 16, 2020 10:03
Show Gist options
  • Save boseoladipo/01dc902347f9b209d079de480ef4856a to your computer and use it in GitHub Desktop.
Save boseoladipo/01dc902347f9b209d079de480ef4856a to your computer and use it in GitHub Desktop.
Apache Beam Pardo class for Exponea pipeline
class FetchEventsFn(beam.DoFn):
def __init__(self, payload):
self.payload = payload
self.url = exponea_vars['url']
self.headers = exponea_vars['headers']
def process(self, record):
try:
id = record['cookie']
payload = self.payload
payload["customer_ids"]["cookie"] = id
response = requests.request("POST", self.url, data=json.dumps(payload), headers=self.headers)
response_json = response.json()
l_properties = response_json.get("data")
l_customer = [
{
"id": id,
"events": [{k:str(v) for k,v in i.items()}]
}
for i in l_properties
] if l_properties else []
except Exception as e:
l_customer = []
logging.error(f'{id} load failed due to {e}')
return l_customer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment