Skip to content

Instantly share code, notes, and snippets.

@Samstiles
Created November 4, 2015 18:06
Show Gist options
  • Save Samstiles/18d0f80478fd50c22e0f to your computer and use it in GitHub Desktop.
Save Samstiles/18d0f80478fd50c22e0f to your computer and use it in GitHub Desktop.
(ns stats-service.handlers.contextualizer
"Contextualizes incoming source events, duplicating and delegating throughout the cluster as needed."
(:refer-clojure :exclude [promise await])
(:require [zinn.log :as log]
[stats-service.handlers.event :as e]
[co.paralleluniverse.pulsar.core :refer :all]))
(defn delegate-contextualized-events
"Interacts with the [store] (dynamodb?) to find which node handles a particular source event. For now, throws things
directly on the processing queue of this node until we are actually doing delegation."
[events]
(doseq [event events]
(snd e/message-queue event)))
(defn annotate-event
"Applies the context and id field to the provided source event."
[source-event id context]
(merge source-event {:id id :context context}))
(defn contextualize
"Parses a source event for identifiers that are representative of contexts (interactions, queues, resources, etc) and
annotates + replicates the event based on these contexts."
[source-event]
(log/debug "SE" source-event)
(let [{:keys [resource-id queue-id interaction-id]} source-event
replicated-events (cond-> []
resource-id (conj (annotate-event source-event resource-id :resource))
queue-id (conj (annotate-event source-event resource-id :queue))
interaction-id (conj (annotate-event source-event resource-id :interaction)))]
(delegate-contextualized-events replicated-events)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment