Skip to content

Instantly share code, notes, and snippets.

@cbarley10
Created July 12, 2022 15:21
Show Gist options
  • Save cbarley10/1b0bb764814206df535a41ff33ad4701 to your computer and use it in GitHub Desktop.
Save cbarley10/1b0bb764814206df535a41ff33ad4701 to your computer and use it in GitHub Desktop.
def save_order_shipments(self, order):
if not order["shipments"]:
return
customer_properties_dict = copy.deepcopy(order["customer"])
customer_properties = CustomerProperties(
email=customer_properties_dict.pop(ProfileConstants.EMAIL_KEY)
)
customer_properties.set_custom_properties(customer_properties_dict)
for shipment in order["shipments"]:
if shipment["State"] == "shipped":
if not shipment.get("ShippedAt"):
continue
timestamp = dateutil_parse(shipment["ShippedAt"]).astimezone(pytz.utc)
order_properties = copy.deepcopy(order["properties"])
order_properties["extra"] = order_properties.pop(EventConstants.EXTRA)
properties = {
EventConstants.EXTRA: {
"Shipment": shipment,
"Order": order_properties,
},
}
shipped_order_event = OrderShippedEvent(
service=self.service,
unique_key=shipment["Id"],
customer_properties=customer_properties,
event_data=properties,
time=timestamp,
)
self._publish_event(shipped_order_event)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment